From 17d2ac65ab391f2814d3fc2515adc553c6c587ce Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Sun, 16 Jan 2022 21:03:48 +0300 Subject: [PATCH] Add Orchestrator def ScheduleGet, ThreadStart Create Process StopSafe and many others def # next todo: form of schedule - has until but not has 'from' # next todo: create def to check safe signal termination --- .../Orchestrator/BackwardCompatibility.py | 8 +- .../Orchestrator/Managers/Process.py | 78 ++++--- .../Orchestrator/SettingsTemplate.py | 2 + .../Orchestrator/__Orchestrator__.py | 190 ++++++++++++------ 4 files changed, 187 insertions(+), 91 deletions(-) diff --git a/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py b/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py index 32db006f..61e7e7d0 100644 --- a/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py +++ b/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py @@ -2,7 +2,7 @@ # !!! ATTENTION: Backward compatibility has been started from v1.1.13 !!! # So you can use config of the orchestrator 1.1.13 in new Orchestrator versions and all will be ok :) (hope it's true) import win32security, json, datetime, time, copy - +import schedule # # # # # # # # # # # # # # # # # # # # Backward compatibility Web defs up to v1.2.0 # # # # # # # # # # # # # # # # # # # @@ -511,4 +511,8 @@ def Update(inGSettings): if "ManagersProcessDict" not in inGSettings: inGSettings["ManagersProcessDict"]={} if lL: lL.warning( - f"Backward compatibility (v1.2.4 to v1.2.7): Create new key: ManagersProcessDict") # Log about compatibility \ No newline at end of file + f"Backward compatibility (v1.2.4 to v1.2.7): Create new key: ManagersProcessDict") # Log about compatibility + # Check "SchedulerDict": { "Schedule": schedule, # https://schedule.readthedocs.io/en/stable/examples.html + if inGSettings.get("SchedulerDict",{}).get("Schedule",None) is None: + inGSettings["SchedulerDict"]["Schedule"] = schedule + if lL: lL.warning(f"Backward compatibility (v1.2.4 to v1.2.7): Create new module schedule (schedule.readthedocs.io)") # Log about compatibility \ No newline at end of file diff --git a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py index e1845b2a..46078fc6 100644 --- a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py +++ b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py @@ -1,7 +1,7 @@ #from pyOpenRPA.Orchestrator import Managers from .. import __Orchestrator__ import os - +import time class Process(): """ Manager process, which is need to be started / stopped / restarted @@ -83,20 +83,48 @@ class Process(): Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto :param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation - :return: + :return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL """ - pass + + # Send activity item to agent - wait result + lCMDStr = f'taskkill /im "{self.mProcessNameWOExeStr}.exe" /fi "username eq %USERNAME%"' + lActivityItemStart = __Orchestrator__.ProcessorActivityItemCreate( + inDef="OSCMD",inArgDict={"inCMDStr": lCMDStr,"inSendOutputToOrchestratorLogsBool":False},inArgGSettingsStr="inGSettings") + lGUIDStr = __Orchestrator__.AgentActivityItemAdd(inHostNameStr=self.mAgentHostNameStr, + inUserStr=self.mAgentUserNameStr, + inActivityItemDict=lActivityItemStart) + lStartResult = __Orchestrator__.AgentActivityItemReturnGet(inGUIDStr=lGUIDStr) + if inIsManualBool==True: + self.mStatusStr = "3_STOP_SAFE_MANUAL" + else: + self.mStatusStr = "2_STOP_SAFE" + # Log info about process + self.StatusChangeLog() + # Interval check is stopped + lTimeStartFloat = time.time() + lIntervalCheckSafeStatusFLoat = 15.0 + while "SAFE" in self.mStatusStr and (time.time() - lTimeStartFloat) < self.mStopSafeTimeoutSecFloat: + self.StatusCheck() + time.sleep(lIntervalCheckSafeStatusFLoat) + if "SAFE" in self.mStatusStr: + # Log info about process + lL = __Orchestrator__.OrchestratorLoggerGet() + lL.info(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Safe stop has been wait for {self.mStopSafeTimeoutSecFloat} sec. Now do the force stop.") + self.StopForce(inIsManualBool=inIsManualBool) + # Log info about process + self.StatusChangeLog() + return self.mStatusStr def StopForce(self, inIsManualBool = True) -> str: """ - Manual/Auto stop force. Force stop dont wait process termination - it just terminate process now. + Manual/Auto stop force. Force stop don't wait process termination - it just terminate process now. Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto :param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation :return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL """ # Send activity item to agent - wait result - lCMDStr = f'taskkill /im "{self.mProcessNameWOExeStr}.exe" /fi "username eq %USERNAME%"' + lCMDStr = f'taskkill /F /im "{self.mProcessNameWOExeStr}.exe" /fi "username eq %USERNAME%"' lActivityItemStart = __Orchestrator__.ProcessorActivityItemCreate( inDef="OSCMD",inArgDict={"inCMDStr": lCMDStr,"inSendOutputToOrchestratorLogsBool":False},inArgGSettingsStr="inGSettings") lGUIDStr = __Orchestrator__.AgentActivityItemAdd(inHostNameStr=self.mAgentHostNameStr, @@ -117,9 +145,10 @@ class Process(): Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto :param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation - :return: + :return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL """ - pass + self.StopSafe(inIsManualBool=inIsManualBool) + return self.Start(inIsManualBool=inIsManualBool) def RestartForce(self, inIsManualBool = True): """ @@ -127,9 +156,10 @@ class Process(): Manual restart will block scheduling execution. To return schedule execution use def Manual2Auto :param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation - :return: + :return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL """ - pass + self.StopForce(inIsManualBool=inIsManualBool) + return self.Start(inIsManualBool=inIsManualBool) def StatusChangeLog(self): """ @@ -158,6 +188,8 @@ class Process(): if self.mStatusStr == "0_STOPPED": self.mStatusStr = "4_STARTED"; lLogBool=True if self.mStatusStr is None: self.mStatusStr = "4_STARTED"; lLogBool=True else: + if self.mStatusStr == "2_STOP_SAFE": self.mStatusStr = "0_STOPPED"; lLogBool = True + if self.mStatusStr == "3_STOP_SAFE_MANUAL": self.mStatusStr = "1_STOPPED_MANUAL"; lLogBool = True if self.mStatusStr == "5_STARTED_MANUAL": self.mStatusStr = "1_STOPPED_MANUAL"; lLogBool=True if self.mStatusStr == "4_STARTED": self.mStatusStr = "0_STOPPED"; lLogBool=True if self.mStatusStr is None: self.mStatusStr = "0_STOPPED"; lLogBool=True @@ -168,17 +200,22 @@ class Process(): """ Check process status and run it if auto stopped self.mStatusStr is "0_STOPPED" - :return: + :return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL """ - pass - + lStatusStr = self.StatusCheck() + if lStatusStr == "0_STOPPED": + self.Start(inIsManualBool=False) + return self.mStatusStr def StatusCheckStopForce(self): """ Check process status and auto stop force it if self.mStatusStr is 4_STARTED - :return: + :return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL """ - pass + lStatusStr = self.StatusCheck() + if lStatusStr == "4_STARTED": + self.StopForce(inIsManualBool=False) + return self.mStatusStr def StatusCheckStopSafe(self): """ @@ -186,15 +223,10 @@ class Process(): :return: """ - pass - - def ScheduleWeekDay(self): - """ - Some template def to work with schedule package. Configure schedule to start. Stop process in auto mode in all sele. - - :return: - """ - pass + lStatusStr = self.StatusCheck() + if lStatusStr == "4_STARTED": + self.StopSafe(inIsManualBool=False) + return self.mStatusStr def ProcessGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str) -> Process: diff --git a/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py b/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py index 9a1a17fe..ddb3a3dd 100644 --- a/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py +++ b/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py @@ -1,4 +1,5 @@ import os, logging, datetime, sys +import schedule # https://schedule.readthedocs.io/en/stable/examples.html # Technical def - return GSettings structure with examples def __Create__(): @@ -129,6 +130,7 @@ def __Create__(): "ActivityList": [] }, "SchedulerDict": { + "Schedule": schedule, # https://schedule.readthedocs.io/en/stable/examples.html "CheckIntervalSecFloat": 5.0, # Check interval in seconds "ActivityTimeList": [ # { diff --git a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py index e3672f8d..e750e850 100644 --- a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py +++ b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py @@ -1,6 +1,7 @@ import subprocess, json, psutil, time, os, win32security, sys, base64, logging, ctypes, copy #Get input argument import pickle import inspect +import schedule from partd import Server from . import Server @@ -471,6 +472,38 @@ def OrchestratorLoggerGet(): """ return GSettingsGet().get("Logger",None) +def OrchestratorScheduleGet(): + """ + Get the schedule (schedule.readthedocs.io) from the Orchestrator + + Fro example you can use: + + .. code-block:: python + # One schedule threaded + Orchestrator.OrchestratorScheduleGet().every(5).seconds.do(lProcess.StatusCheckStart) + + #New schedule thread # See def description Orchestrator.OrchestratorThreadStart + Orchestrator.OrchestratorScheduleGet().every(5).seconds.do(Orchestrator.OrchestratorThreadStart,lProcess.StatusCheckStart) + + :return: schedule module. Example see here https://schedule.readthedocs.io/en/stable/examples.html + """ + if GSettingsGet().get("SchedulerDict",{}).get("Schedule",None) is None: + GSettingsGet()["SchedulerDict"]["Schedule"]=schedule + return GSettingsGet().get("SchedulerDict",{}).get("Schedule",None) + +def OrchestratorThreadStart(inDef, *inArgList, **inArgDict): + """ + Execute def in new thread and pass some args with list and dict types + + :param inDef: Python Def + :param inArgList: args as list + :param inArgDict: args as dict + :return: threading.Thread object + """ + lDefThread = threading.Thread(target=inDef,args=inArgList,kwargs=inArgDict) + lDefThread.start() + return lDefThread + def OrchestratorIsAdmin(): """ Check if Orchestrator process is running as administrator @@ -2562,9 +2595,6 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato ActivityItemDefAliasModulesLoad() #Инициализация настроечных параметров - lDaemonLoopSeconds=gSettingsDict["SchedulerDict"]["CheckIntervalSecFloat"] - lDaemonActivityLogDict={} #Словарь отработанных активностей, ключ - кортеж (, , , ) - lDaemonLastDateTime=datetime.datetime.now() gSettingsDict["ServerDict"]["WorkingDirectoryPathStr"] = os.getcwd() # Set working directory in g settings #Инициализация сервера (инициализация всех интерфейсов) @@ -2615,69 +2645,97 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato lProcessorMonitorThread.start() # Start the thread execution. if lL: lL.info("Processor monitor has been started") #Logging - if lL: lL.info("Scheduler loop start") #Logging - gDaemonActivityLogDictRefreshSecInt = 10 # The second period for clear lDaemonActivityLogDict from old items - gDaemonActivityLogDictLastTime = time.time() # The second perioad for clean lDaemonActivityLogDict from old items - - - - while True: - try: - lCurrentDateTime = datetime.datetime.now() - #Циклический обход правил - lFlagSearchActivityType=True - # Periodically clear the lDaemonActivityLogDict - if time.time()-gDaemonActivityLogDictLastTime>=gDaemonActivityLogDictRefreshSecInt: - gDaemonActivityLogDictLastTime = time.time() # Update the time - for lIndex, lItem in enumerate(lDaemonActivityLogDict): - if lItem["ActivityEndDateTime"] and lCurrentDateTime<=lItem["ActivityEndDateTime"]: - pass - # Activity is actual - do not delete now - else: - # remove the activity - not actual - lDaemonActivityLogDict.pop(lIndex,None) - lIterationLastDateTime = lDaemonLastDateTime # Get current datetime before iterator (need for iterate all activities in loop) - # Iterate throught the activity list - for lIndex, lItem in enumerate(gSettingsDict["SchedulerDict"]["ActivityTimeList"]): - try: - # Prepare GUID of the activity - lGUID = None - if "GUID" in lItem and lItem["GUID"]: - lGUID = lItem["GUID"] - else: - lGUID = str(uuid.uuid4()) - lItem["GUID"]=lGUID - - #Проверка дней недели, в рамках которых можно запускать активность - lItemWeekdayList=lItem.get("WeekdayList", [0, 1, 2, 3, 4, 5, 6]) - if lCurrentDateTime.weekday() in lItemWeekdayList: - if lFlagSearchActivityType: - ####################################################################### - #Branch 1 - if has TimeHH:MM - ####################################################################### - if "TimeHH:MMStr" in lItem: - #Вид активности - запуск процесса - #Сформировать временной штамп, относительно которого надо будет проверять время - #часовой пояс пока не учитываем - lActivityDateTime=datetime.datetime.strptime(lItem["TimeHH:MMStr"],"%H:%M") - lActivityDateTime=lActivityDateTime.replace(year=lCurrentDateTime.year,month=lCurrentDateTime.month,day=lCurrentDateTime.day) - #Убедиться в том, что время наступило - if ( - lActivityDateTime>=lDaemonLastDateTime and - lCurrentDateTime>=lActivityDateTime): - # Log info about activity - if lL: lL.info(f"Scheduler:: Activity list is started in new thread. Parameters are not available to see.") #Logging - # Do the activity - lThread = threading.Thread(target=Processor.ActivityListExecute, kwargs={"inGSettings": inGSettings, "inActivityList":lItem["ActivityList"]}) - lThread.start() - lIterationLastDateTime = datetime.datetime.now() # Set the new datetime for the new processor activity - except Exception as e: - if lL: lL.exception(f"Scheduler: Exception has been catched in Scheduler module when activity time item was initialising. ActivityTimeItem is {lItem}") - lDaemonLastDateTime = lIterationLastDateTime # Set the new datetime for the new processor activity - #Уснуть до следующего прогона - time.sleep(lDaemonLoopSeconds) - except Exception as e: - if lL: lL.exception(f"Scheduler: Exception has been catched in Scheduler module. Global error") + # Scheduler loop + lSchedulerThread = threading.Thread(target= __deprecated_orchestrator_loop__) + lSchedulerThread.daemon = True # Run the thread in daemon mode. + lSchedulerThread.start() # Start the thread execution. + if lL: lL.info("Scheduler (old) loop start") #Logging + + + # Schedule (new) loop + lScheduleThread = threading.Thread(target= __schedule_loop__) + lScheduleThread.daemon = True # Run the thread in daemon mode. + lScheduleThread.start() # Start the thread execution. + if lL: lL.info("Schedule module (new) loop start") #Logging + + +def __schedule_loop__(): + while True: + schedule.run_pending() + time.sleep(3) + +# Backward compatibility below to 1.2.7 +def __deprecated_orchestrator_loop__(): + lL = OrchestratorLoggerGet() + inGSettings = GSettingsGet() + lDaemonLoopSeconds = gSettingsDict["SchedulerDict"]["CheckIntervalSecFloat"] + lDaemonActivityLogDict = {} # Словарь отработанных активностей, ключ - кортеж (, , , ) + lDaemonLastDateTime = datetime.datetime.now() + gDaemonActivityLogDictRefreshSecInt = 10 # The second period for clear lDaemonActivityLogDict from old items + gDaemonActivityLogDictLastTime = time.time() # The second perioad for clean lDaemonActivityLogDict from old items + while True: + try: + lCurrentDateTime = datetime.datetime.now() + # Циклический обход правил + lFlagSearchActivityType = True + # Periodically clear the lDaemonActivityLogDict + if time.time() - gDaemonActivityLogDictLastTime >= gDaemonActivityLogDictRefreshSecInt: + gDaemonActivityLogDictLastTime = time.time() # Update the time + for lIndex, lItem in enumerate(lDaemonActivityLogDict): + if lItem["ActivityEndDateTime"] and lCurrentDateTime <= lItem["ActivityEndDateTime"]: + pass + # Activity is actual - do not delete now + else: + # remove the activity - not actual + lDaemonActivityLogDict.pop(lIndex, None) + lIterationLastDateTime = lDaemonLastDateTime # Get current datetime before iterator (need for iterate all activities in loop) + # Iterate throught the activity list + for lIndex, lItem in enumerate(gSettingsDict["SchedulerDict"]["ActivityTimeList"]): + try: + # Prepare GUID of the activity + lGUID = None + if "GUID" in lItem and lItem["GUID"]: + lGUID = lItem["GUID"] + else: + lGUID = str(uuid.uuid4()) + lItem["GUID"] = lGUID + + # Проверка дней недели, в рамках которых можно запускать активность + lItemWeekdayList = lItem.get("WeekdayList", [0, 1, 2, 3, 4, 5, 6]) + if lCurrentDateTime.weekday() in lItemWeekdayList: + if lFlagSearchActivityType: + ####################################################################### + # Branch 1 - if has TimeHH:MM + ####################################################################### + if "TimeHH:MMStr" in lItem: + # Вид активности - запуск процесса + # Сформировать временной штамп, относительно которого надо будет проверять время + # часовой пояс пока не учитываем + lActivityDateTime = datetime.datetime.strptime(lItem["TimeHH:MMStr"], "%H:%M") + lActivityDateTime = lActivityDateTime.replace(year=lCurrentDateTime.year, + month=lCurrentDateTime.month, + day=lCurrentDateTime.day) + # Убедиться в том, что время наступило + if ( + lActivityDateTime >= lDaemonLastDateTime and + lCurrentDateTime >= lActivityDateTime): + # Log info about activity + if lL: lL.info( + f"Scheduler:: Activity list is started in new thread. Parameters are not available to see.") # Logging + # Do the activity + lThread = threading.Thread(target=Processor.ActivityListExecute, + kwargs={"inGSettings": inGSettings, + "inActivityList": lItem["ActivityList"]}) + lThread.start() + lIterationLastDateTime = datetime.datetime.now() # Set the new datetime for the new processor activity + except Exception as e: + if lL: lL.exception( + f"Scheduler: Exception has been catched in Scheduler module when activity time item was initialising. ActivityTimeItem is {lItem}") + lDaemonLastDateTime = lIterationLastDateTime # Set the new datetime for the new processor activity + # Уснуть до следующего прогона + time.sleep(lDaemonLoopSeconds) + except Exception as e: + if lL: lL.exception(f"Scheduler: Exception has been catched in Scheduler module. Global error") # Backward compatibility below to 1.2.0 def __deprecated_orchestrator_start__():