Add Orchestrator def ScheduleGet, ThreadStart

Create Process StopSafe and many others def
# next todo: form of schedule - has until but not has 'from'
# next todo: create def to check safe signal termination
dev-linux
Ivan Maslov 3 years ago
parent c989a55b1c
commit 17d2ac65ab

@ -2,7 +2,7 @@
# !!! ATTENTION: Backward compatibility has been started from v1.1.13 !!!
# So you can use config of the orchestrator 1.1.13 in new Orchestrator versions and all will be ok :) (hope it's true)
import win32security, json, datetime, time, copy
import schedule
# # # # # # # # # # # # # # # # # # #
# Backward compatibility Web defs up to v1.2.0
# # # # # # # # # # # # # # # # # # #
@ -511,4 +511,8 @@ def Update(inGSettings):
if "ManagersProcessDict" not in inGSettings:
inGSettings["ManagersProcessDict"]={}
if lL: lL.warning(
f"Backward compatibility (v1.2.4 to v1.2.7): Create new key: ManagersProcessDict") # Log about compatibility
f"Backward compatibility (v1.2.4 to v1.2.7): Create new key: ManagersProcessDict") # Log about compatibility
# Check "SchedulerDict": { "Schedule": schedule, # https://schedule.readthedocs.io/en/stable/examples.html
if inGSettings.get("SchedulerDict",{}).get("Schedule",None) is None:
inGSettings["SchedulerDict"]["Schedule"] = schedule
if lL: lL.warning(f"Backward compatibility (v1.2.4 to v1.2.7): Create new module schedule (schedule.readthedocs.io)") # Log about compatibility

@ -1,7 +1,7 @@
#from pyOpenRPA.Orchestrator import Managers
from .. import __Orchestrator__
import os
import time
class Process():
"""
Manager process, which is need to be started / stopped / restarted
@ -83,20 +83,48 @@ class Process():
Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto
:param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation
:return:
:return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL
"""
pass
# Send activity item to agent - wait result
lCMDStr = f'taskkill /im "{self.mProcessNameWOExeStr}.exe" /fi "username eq %USERNAME%"'
lActivityItemStart = __Orchestrator__.ProcessorActivityItemCreate(
inDef="OSCMD",inArgDict={"inCMDStr": lCMDStr,"inSendOutputToOrchestratorLogsBool":False},inArgGSettingsStr="inGSettings")
lGUIDStr = __Orchestrator__.AgentActivityItemAdd(inHostNameStr=self.mAgentHostNameStr,
inUserStr=self.mAgentUserNameStr,
inActivityItemDict=lActivityItemStart)
lStartResult = __Orchestrator__.AgentActivityItemReturnGet(inGUIDStr=lGUIDStr)
if inIsManualBool==True:
self.mStatusStr = "3_STOP_SAFE_MANUAL"
else:
self.mStatusStr = "2_STOP_SAFE"
# Log info about process
self.StatusChangeLog()
# Interval check is stopped
lTimeStartFloat = time.time()
lIntervalCheckSafeStatusFLoat = 15.0
while "SAFE" in self.mStatusStr and (time.time() - lTimeStartFloat) < self.mStopSafeTimeoutSecFloat:
self.StatusCheck()
time.sleep(lIntervalCheckSafeStatusFLoat)
if "SAFE" in self.mStatusStr:
# Log info about process
lL = __Orchestrator__.OrchestratorLoggerGet()
lL.info(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Safe stop has been wait for {self.mStopSafeTimeoutSecFloat} sec. Now do the force stop.")
self.StopForce(inIsManualBool=inIsManualBool)
# Log info about process
self.StatusChangeLog()
return self.mStatusStr
def StopForce(self, inIsManualBool = True) -> str:
"""
Manual/Auto stop force. Force stop dont wait process termination - it just terminate process now.
Manual/Auto stop force. Force stop don't wait process termination - it just terminate process now.
Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto
:param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation
:return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL
"""
# Send activity item to agent - wait result
lCMDStr = f'taskkill /im "{self.mProcessNameWOExeStr}.exe" /fi "username eq %USERNAME%"'
lCMDStr = f'taskkill /F /im "{self.mProcessNameWOExeStr}.exe" /fi "username eq %USERNAME%"'
lActivityItemStart = __Orchestrator__.ProcessorActivityItemCreate(
inDef="OSCMD",inArgDict={"inCMDStr": lCMDStr,"inSendOutputToOrchestratorLogsBool":False},inArgGSettingsStr="inGSettings")
lGUIDStr = __Orchestrator__.AgentActivityItemAdd(inHostNameStr=self.mAgentHostNameStr,
@ -117,9 +145,10 @@ class Process():
Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto
:param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation
:return:
:return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL
"""
pass
self.StopSafe(inIsManualBool=inIsManualBool)
return self.Start(inIsManualBool=inIsManualBool)
def RestartForce(self, inIsManualBool = True):
"""
@ -127,9 +156,10 @@ class Process():
Manual restart will block scheduling execution. To return schedule execution use def Manual2Auto
:param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation
:return:
:return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL
"""
pass
self.StopForce(inIsManualBool=inIsManualBool)
return self.Start(inIsManualBool=inIsManualBool)
def StatusChangeLog(self):
"""
@ -158,6 +188,8 @@ class Process():
if self.mStatusStr == "0_STOPPED": self.mStatusStr = "4_STARTED"; lLogBool=True
if self.mStatusStr is None: self.mStatusStr = "4_STARTED"; lLogBool=True
else:
if self.mStatusStr == "2_STOP_SAFE": self.mStatusStr = "0_STOPPED"; lLogBool = True
if self.mStatusStr == "3_STOP_SAFE_MANUAL": self.mStatusStr = "1_STOPPED_MANUAL"; lLogBool = True
if self.mStatusStr == "5_STARTED_MANUAL": self.mStatusStr = "1_STOPPED_MANUAL"; lLogBool=True
if self.mStatusStr == "4_STARTED": self.mStatusStr = "0_STOPPED"; lLogBool=True
if self.mStatusStr is None: self.mStatusStr = "0_STOPPED"; lLogBool=True
@ -168,17 +200,22 @@ class Process():
"""
Check process status and run it if auto stopped self.mStatusStr is "0_STOPPED"
:return:
:return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL
"""
pass
lStatusStr = self.StatusCheck()
if lStatusStr == "0_STOPPED":
self.Start(inIsManualBool=False)
return self.mStatusStr
def StatusCheckStopForce(self):
"""
Check process status and auto stop force it if self.mStatusStr is 4_STARTED
:return:
:return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL
"""
pass
lStatusStr = self.StatusCheck()
if lStatusStr == "4_STARTED":
self.StopForce(inIsManualBool=False)
return self.mStatusStr
def StatusCheckStopSafe(self):
"""
@ -186,15 +223,10 @@ class Process():
:return:
"""
pass
def ScheduleWeekDay(self):
"""
Some template def to work with schedule package. Configure schedule to start. Stop process in auto mode in all sele.
:return:
"""
pass
lStatusStr = self.StatusCheck()
if lStatusStr == "4_STARTED":
self.StopSafe(inIsManualBool=False)
return self.mStatusStr
def ProcessGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str) -> Process:

@ -1,4 +1,5 @@
import os, logging, datetime, sys
import schedule # https://schedule.readthedocs.io/en/stable/examples.html
# Technical def - return GSettings structure with examples
def __Create__():
@ -129,6 +130,7 @@ def __Create__():
"ActivityList": []
},
"SchedulerDict": {
"Schedule": schedule, # https://schedule.readthedocs.io/en/stable/examples.html
"CheckIntervalSecFloat": 5.0, # Check interval in seconds
"ActivityTimeList": [
# {

@ -1,6 +1,7 @@
import subprocess, json, psutil, time, os, win32security, sys, base64, logging, ctypes, copy #Get input argument
import pickle
import inspect
import schedule
from partd import Server
from . import Server
@ -471,6 +472,38 @@ def OrchestratorLoggerGet():
"""
return GSettingsGet().get("Logger",None)
def OrchestratorScheduleGet():
"""
Get the schedule (schedule.readthedocs.io) from the Orchestrator
Fro example you can use:
.. code-block:: python
# One schedule threaded
Orchestrator.OrchestratorScheduleGet().every(5).seconds.do(lProcess.StatusCheckStart)
#New schedule thread # See def description Orchestrator.OrchestratorThreadStart
Orchestrator.OrchestratorScheduleGet().every(5).seconds.do(Orchestrator.OrchestratorThreadStart,lProcess.StatusCheckStart)
:return: schedule module. Example see here https://schedule.readthedocs.io/en/stable/examples.html
"""
if GSettingsGet().get("SchedulerDict",{}).get("Schedule",None) is None:
GSettingsGet()["SchedulerDict"]["Schedule"]=schedule
return GSettingsGet().get("SchedulerDict",{}).get("Schedule",None)
def OrchestratorThreadStart(inDef, *inArgList, **inArgDict):
"""
Execute def in new thread and pass some args with list and dict types
:param inDef: Python Def
:param inArgList: args as list
:param inArgDict: args as dict
:return: threading.Thread object
"""
lDefThread = threading.Thread(target=inDef,args=inArgList,kwargs=inArgDict)
lDefThread.start()
return lDefThread
def OrchestratorIsAdmin():
"""
Check if Orchestrator process is running as administrator
@ -2562,9 +2595,6 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato
ActivityItemDefAliasModulesLoad()
#Инициализация настроечных параметров
lDaemonLoopSeconds=gSettingsDict["SchedulerDict"]["CheckIntervalSecFloat"]
lDaemonActivityLogDict={} #Словарь отработанных активностей, ключ - кортеж (<activityType>, <datetime>, <processPath || processName>, <processArgs>)
lDaemonLastDateTime=datetime.datetime.now()
gSettingsDict["ServerDict"]["WorkingDirectoryPathStr"] = os.getcwd() # Set working directory in g settings
#Инициализация сервера (инициализация всех интерфейсов)
@ -2615,69 +2645,97 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato
lProcessorMonitorThread.start() # Start the thread execution.
if lL: lL.info("Processor monitor has been started") #Logging
if lL: lL.info("Scheduler loop start") #Logging
gDaemonActivityLogDictRefreshSecInt = 10 # The second period for clear lDaemonActivityLogDict from old items
gDaemonActivityLogDictLastTime = time.time() # The second perioad for clean lDaemonActivityLogDict from old items
while True:
try:
lCurrentDateTime = datetime.datetime.now()
#Циклический обход правил
lFlagSearchActivityType=True
# Periodically clear the lDaemonActivityLogDict
if time.time()-gDaemonActivityLogDictLastTime>=gDaemonActivityLogDictRefreshSecInt:
gDaemonActivityLogDictLastTime = time.time() # Update the time
for lIndex, lItem in enumerate(lDaemonActivityLogDict):
if lItem["ActivityEndDateTime"] and lCurrentDateTime<=lItem["ActivityEndDateTime"]:
pass
# Activity is actual - do not delete now
else:
# remove the activity - not actual
lDaemonActivityLogDict.pop(lIndex,None)
lIterationLastDateTime = lDaemonLastDateTime # Get current datetime before iterator (need for iterate all activities in loop)
# Iterate throught the activity list
for lIndex, lItem in enumerate(gSettingsDict["SchedulerDict"]["ActivityTimeList"]):
try:
# Prepare GUID of the activity
lGUID = None
if "GUID" in lItem and lItem["GUID"]:
lGUID = lItem["GUID"]
else:
lGUID = str(uuid.uuid4())
lItem["GUID"]=lGUID
#Проверка дней недели, в рамках которых можно запускать активность
lItemWeekdayList=lItem.get("WeekdayList", [0, 1, 2, 3, 4, 5, 6])
if lCurrentDateTime.weekday() in lItemWeekdayList:
if lFlagSearchActivityType:
#######################################################################
#Branch 1 - if has TimeHH:MM
#######################################################################
if "TimeHH:MMStr" in lItem:
#Вид активности - запуск процесса
#Сформировать временной штамп, относительно которого надо будет проверять время
#часовой пояс пока не учитываем
lActivityDateTime=datetime.datetime.strptime(lItem["TimeHH:MMStr"],"%H:%M")
lActivityDateTime=lActivityDateTime.replace(year=lCurrentDateTime.year,month=lCurrentDateTime.month,day=lCurrentDateTime.day)
#Убедиться в том, что время наступило
if (
lActivityDateTime>=lDaemonLastDateTime and
lCurrentDateTime>=lActivityDateTime):
# Log info about activity
if lL: lL.info(f"Scheduler:: Activity list is started in new thread. Parameters are not available to see.") #Logging
# Do the activity
lThread = threading.Thread(target=Processor.ActivityListExecute, kwargs={"inGSettings": inGSettings, "inActivityList":lItem["ActivityList"]})
lThread.start()
lIterationLastDateTime = datetime.datetime.now() # Set the new datetime for the new processor activity
except Exception as e:
if lL: lL.exception(f"Scheduler: Exception has been catched in Scheduler module when activity time item was initialising. ActivityTimeItem is {lItem}")
lDaemonLastDateTime = lIterationLastDateTime # Set the new datetime for the new processor activity
#Уснуть до следующего прогона
time.sleep(lDaemonLoopSeconds)
except Exception as e:
if lL: lL.exception(f"Scheduler: Exception has been catched in Scheduler module. Global error")
# Scheduler loop
lSchedulerThread = threading.Thread(target= __deprecated_orchestrator_loop__)
lSchedulerThread.daemon = True # Run the thread in daemon mode.
lSchedulerThread.start() # Start the thread execution.
if lL: lL.info("Scheduler (old) loop start") #Logging
# Schedule (new) loop
lScheduleThread = threading.Thread(target= __schedule_loop__)
lScheduleThread.daemon = True # Run the thread in daemon mode.
lScheduleThread.start() # Start the thread execution.
if lL: lL.info("Schedule module (new) loop start") #Logging
def __schedule_loop__():
while True:
schedule.run_pending()
time.sleep(3)
# Backward compatibility below to 1.2.7
def __deprecated_orchestrator_loop__():
lL = OrchestratorLoggerGet()
inGSettings = GSettingsGet()
lDaemonLoopSeconds = gSettingsDict["SchedulerDict"]["CheckIntervalSecFloat"]
lDaemonActivityLogDict = {} # Словарь отработанных активностей, ключ - кортеж (<activityType>, <datetime>, <processPath || processName>, <processArgs>)
lDaemonLastDateTime = datetime.datetime.now()
gDaemonActivityLogDictRefreshSecInt = 10 # The second period for clear lDaemonActivityLogDict from old items
gDaemonActivityLogDictLastTime = time.time() # The second perioad for clean lDaemonActivityLogDict from old items
while True:
try:
lCurrentDateTime = datetime.datetime.now()
# Циклический обход правил
lFlagSearchActivityType = True
# Periodically clear the lDaemonActivityLogDict
if time.time() - gDaemonActivityLogDictLastTime >= gDaemonActivityLogDictRefreshSecInt:
gDaemonActivityLogDictLastTime = time.time() # Update the time
for lIndex, lItem in enumerate(lDaemonActivityLogDict):
if lItem["ActivityEndDateTime"] and lCurrentDateTime <= lItem["ActivityEndDateTime"]:
pass
# Activity is actual - do not delete now
else:
# remove the activity - not actual
lDaemonActivityLogDict.pop(lIndex, None)
lIterationLastDateTime = lDaemonLastDateTime # Get current datetime before iterator (need for iterate all activities in loop)
# Iterate throught the activity list
for lIndex, lItem in enumerate(gSettingsDict["SchedulerDict"]["ActivityTimeList"]):
try:
# Prepare GUID of the activity
lGUID = None
if "GUID" in lItem and lItem["GUID"]:
lGUID = lItem["GUID"]
else:
lGUID = str(uuid.uuid4())
lItem["GUID"] = lGUID
# Проверка дней недели, в рамках которых можно запускать активность
lItemWeekdayList = lItem.get("WeekdayList", [0, 1, 2, 3, 4, 5, 6])
if lCurrentDateTime.weekday() in lItemWeekdayList:
if lFlagSearchActivityType:
#######################################################################
# Branch 1 - if has TimeHH:MM
#######################################################################
if "TimeHH:MMStr" in lItem:
# Вид активности - запуск процесса
# Сформировать временной штамп, относительно которого надо будет проверять время
# часовой пояс пока не учитываем
lActivityDateTime = datetime.datetime.strptime(lItem["TimeHH:MMStr"], "%H:%M")
lActivityDateTime = lActivityDateTime.replace(year=lCurrentDateTime.year,
month=lCurrentDateTime.month,
day=lCurrentDateTime.day)
# Убедиться в том, что время наступило
if (
lActivityDateTime >= lDaemonLastDateTime and
lCurrentDateTime >= lActivityDateTime):
# Log info about activity
if lL: lL.info(
f"Scheduler:: Activity list is started in new thread. Parameters are not available to see.") # Logging
# Do the activity
lThread = threading.Thread(target=Processor.ActivityListExecute,
kwargs={"inGSettings": inGSettings,
"inActivityList": lItem["ActivityList"]})
lThread.start()
lIterationLastDateTime = datetime.datetime.now() # Set the new datetime for the new processor activity
except Exception as e:
if lL: lL.exception(
f"Scheduler: Exception has been catched in Scheduler module when activity time item was initialising. ActivityTimeItem is {lItem}")
lDaemonLastDateTime = lIterationLastDateTime # Set the new datetime for the new processor activity
# Уснуть до следующего прогона
time.sleep(lDaemonLoopSeconds)
except Exception as e:
if lL: lL.exception(f"Scheduler: Exception has been catched in Scheduler module. Global error")
# Backward compatibility below to 1.2.0
def __deprecated_orchestrator_start__():

Loading…
Cancel
Save