From ac160e85627d3fc4af63a66870e005f14386bc57 Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Tue, 8 Feb 2022 22:30:28 +0300 Subject: [PATCH] init js after data render, parallel init py, new def wait orc init, logs optimization. Processor add ThreadBool flag. Fix in managers.Process when check status of not initialized agent --- Sources/pyOpenRPA/Orchestrator/Core.py | 2 +- .../Orchestrator/Managers/ControlPanel.py | 5 ++ .../Orchestrator/Managers/Process.py | 2 +- Sources/pyOpenRPA/Orchestrator/Processor.py | 32 ++++++++++-- .../pyOpenRPA/Orchestrator/ServerSettings.py | 6 ++- Sources/pyOpenRPA/Orchestrator/Web/Index.js | 3 +- .../Orchestrator/__Orchestrator__.py | 50 ++++++++++++++++--- 7 files changed, 83 insertions(+), 17 deletions(-) diff --git a/Sources/pyOpenRPA/Orchestrator/Core.py b/Sources/pyOpenRPA/Orchestrator/Core.py index 329386fe..f92f7442 100644 --- a/Sources/pyOpenRPA/Orchestrator/Core.py +++ b/Sources/pyOpenRPA/Orchestrator/Core.py @@ -4,7 +4,7 @@ import threading def IsProcessorThread(inGSettings): return inGSettings["ProcessorDict"]["ThreadIdInt"] == threading.get_ident() -def IsOrchestratorInitialized(inGSettings): +def IsOrchestratorInitialized(inGSettings) -> bool: """ Check if Orchestrator will be successfully initialized diff --git a/Sources/pyOpenRPA/Orchestrator/Managers/ControlPanel.py b/Sources/pyOpenRPA/Orchestrator/Managers/ControlPanel.py index 288f0afd..cef4acd4 100644 --- a/Sources/pyOpenRPA/Orchestrator/Managers/ControlPanel.py +++ b/Sources/pyOpenRPA/Orchestrator/Managers/ControlPanel.py @@ -20,6 +20,8 @@ class ControlPanel(): # Usage example: lCPManager = Orchestrator.Managers.ControlPanel(inControlPanelNameStr="TestControlPanel", inRefreshHTMLJinja2TemplatePathStr="ControlPanel\\test.html", inJinja2TemplateRefreshBool = True) + + If you use Jinja2 you can use next data context: StorageRobotDict: Orchestrator.StorageRobotGet(inRobotNameStr=self.mRobotNameStr), @@ -33,6 +35,9 @@ class ControlPanel(): OperatorModule: operator, MathModule: math + You can modify jinja context by use the function: + Jinja2DataUpdateDictSet + .. code-block:: html Hello my control panel! You can use any def from Orchestrator module here in Jinja2 HTML template: diff --git a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py index 862a53c2..003ac575 100644 --- a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py +++ b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py @@ -97,7 +97,7 @@ class Process(): self.mProcessNameWOExeStr = inProcessNameWOExeStr self.mStopSafeTimeoutSecFloat = inStopSafeTimeoutSecFloat lGS["ManagersProcessDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper())]=self - lActivityDict = __Orchestrator__.ProcessorActivityItemCreate(inDef=self.StatusCheck,inArgList=[]) + lActivityDict = __Orchestrator__.ProcessorActivityItemCreate(inDef=self.StatusCheck,inArgList=[], inThreadBool=True) __Orchestrator__.ProcessorActivityItemAppend(inActivityItemDict=lActivityDict) else: raise Exception(f"Managers.Process ({inAgentHostNameStr}, {inAgentUserNameStr}, {inProcessNameWOExeStr}): Can't init the Process instance because it already inited in early (see ProcessInitSafe)") diff --git a/Sources/pyOpenRPA/Orchestrator/Processor.py b/Sources/pyOpenRPA/Orchestrator/Processor.py index 0f0b61e4..fd797a16 100644 --- a/Sources/pyOpenRPA/Orchestrator/Processor.py +++ b/Sources/pyOpenRPA/Orchestrator/Processor.py @@ -27,14 +27,40 @@ def ProcessorRunSync(inGSettings, inRobotRDPThreadControlDict): if len(lActivityList)>0: if lL: lL.debug(f'Processor ActivityList len: {len(lActivityList)}') lActivityItem = inGSettings["ProcessorDict"]["ActivityList"].pop(0) # Extract the first item from processor queue - inRobotRDPThreadControlDict["ThreadExecuteBool"]=False # Stop the RobotRDPActive monitoring - ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item - inRobotRDPThreadControlDict["ThreadExecuteBool"] = True # Continue the RobotRDPActive monitoring + if lActivityItem.get("ThreadBool", False) is False: + inRobotRDPThreadControlDict["ThreadExecuteBool"]=False # Stop the RobotRDPActive monitoring + ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item + inRobotRDPThreadControlDict["ThreadExecuteBool"] = True # Continue the RobotRDPActive monitoring + else: + ProcessorRunAsync(inGSettings = inGSettings, inActivityList=[lActivityItem]) else: time.sleep(inGSettings["ProcessorDict"]["CheckIntervalSecFloat"]) # Sleep when list is empty except Exception as e: if lL: lL.exception(f"Processor.ProcessorRunSync. Something goes very wrong in processor queue. See traceback") +# Run processor Async +def ProcessorRunAsync(inGSettings, inActivityList): + """ + "inActivityList": [ # List of the activities + # { + # "Def":"DefAliasTest", # def link or def alias (look gSettings["Processor"]["AliasDefDict"]) + # "ArgList":[1,2,3], # Args list + # "ArgDict":{"ttt":1,"222":2,"dsd":3}, # Args dictionary + # "ArgGSettings": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # "ArgLogger": None, # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # "GUIDStr": "sadasd-asdas-d-asdasd", # ActivityItem GUID which identify the Activity + # "ThreadBool": True + # }, + """ + def __process__(inGSettings, inActivityList): + for lActivityItem in inActivityList: + lL = inGSettings["Logger"] # Logger alias + if lL: lL.debug(f'ActivityItem in new thread') + lResultList = ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item + # Start in new thread + lThread = threading.Thread(target=__process__,kwargs={"inGSettings": inGSettings, "inActivityList": inActivityList}) + lThread.start() + # Execute ActivityItem list # return the def result def ActivityListExecute(inGSettings, inActivityList): diff --git a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py index 12112d6b..4408c569 100644 --- a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py +++ b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py @@ -334,7 +334,6 @@ def pyOpenRPA_Agent_O2A(inRequest, inGSettings): lThisAgentDict["IsListenBool"] = True # Set is online lQueueList = lThisAgentDict["ActivityList"] if len(lQueueList)>0:# Do some operations if has queue items - if lL: lL.debug(f'O2A: ConnectionCountInt: {lThisAgentDict["ConnectionCountInt"]};ConnectionFirstQueueItemCountInt {lThisAgentDict["ConnectionFirstQueueItemCountInt"]}') # check if delta datetime is < than ActivityLifeTimeSecFloat lActivityItem = lThisAgentDict["ActivityList"][0] lActivityLifetimeSecFloat = (datetime.datetime.now() - lActivityItem["CreatedByDatetime"]).total_seconds() @@ -425,7 +424,10 @@ def pyOpenRPA_Agent_A2O(inRequest, inGSettings): lActivityReturnItemValue = lInput["ActivityReturnDict"][lActivityReturnItemKeyStr] # Create item in gSettings inGSettings["AgentActivityReturnDict"][lActivityReturnItemKeyStr]=SettingsTemplate.__AgentActivityReturnDictItemCreate__(inReturn=lActivityReturnItemValue) - if lL: lL.debug(f"SERVER: pyOpenRPA_Agent_A2O:: Has recieved result of the activity items from agent! ActivityItem GUID Str: {lActivityReturnItemKeyStr}; Return value len: {len(lActivityReturnItemValue)}") + lLogStr = "0 bytes" + if lActivityReturnItemValue is not None: + lLogStr = f"{len(lActivityReturnItemValue)} bytes" + if lL: lL.debug(f"SERVER: pyOpenRPA_Agent_A2O:: Has recieved result of the activity items from agent! ActivityItem GUID Str: {lActivityReturnItemKeyStr}; Return value len: {lLogStr}") # Delete the source activity item from AgentDict if lAgentDictItemKeyTurple in inGSettings["AgentDict"]: lAgentDictActivityListNew = [] diff --git a/Sources/pyOpenRPA/Orchestrator/Web/Index.js b/Sources/pyOpenRPA/Orchestrator/Web/Index.js index 7c78a4c4..07fe9ab2 100644 --- a/Sources/pyOpenRPA/Orchestrator/Web/Index.js +++ b/Sources/pyOpenRPA/Orchestrator/Web/Index.js @@ -888,11 +888,10 @@ $(document).ready(function() { } /// v1.2.0 pyOpenRPA Init defs - mGlobal.pyOpenRPA.ServerJSInitDef(); // Recieve JS from server (if exist) and then call anothe url ServerData mGlobal.pyOpenRPA.ServerDataRefreshDef(); // Init the refresh data def from server side mGlobal.pyOpenRPA.ServerLogListRefreshDef(); // Init the refresh data def from the log window mGlobal.pyOpenRPA.ServerLogListDoRenderTrue(); // Init button to freeze/unfreeze textare with logs - + mGlobal.pyOpenRPA.ServerJSInitDef(); // Recieve JS from server (if exist) and then call anothe url ServerData //$('.ui.dropdown').dropdown(); //////////////////////////////////////////// diff --git a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py index 61d0a9f9..2916dd7a 100644 --- a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py +++ b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py @@ -541,6 +541,24 @@ def OrchestratorIsAdmin(): except: return False +def OrchestratorIsInited() -> bool: + """Check if Orchestrator initial actions were processed + + :return: True - orc is already inited; False - else + :rtype: bool + """ + + return Core.IsOrchestratorInitialized(inGSettings=GSettingsGet()) + +def OrchestratorInitWait() -> None: + """Wait thread while orc will process initial action. + ATTENTION: DO NOT CALL THIS DEF IN THREAD WHERE ORCHESTRATOR MUST BE INITIALIZED - INFINITE LOOP + """ + lIntervalSecFloat = 0.5 + while not OrchestratorIsInited(): + time.sleep(lIntervalSecFloat) + + def OrchestratorRerunAsAdmin(): """ Check if not admin - then rerun orchestrator as administrator @@ -553,10 +571,10 @@ def OrchestratorRerunAsAdmin(): else: print(f"!SKIPPED! Already run as administrator!") -def OrchestratorPySearchInit(inGlobPatternStr, inDefStr = None, inDefArgNameGSettingsStr = None): +def OrchestratorPySearchInit(inGlobPatternStr, inDefStr = None, inDefArgNameGSettingsStr = None, inAsyncInitBool = False): """ Search the py files by the glob and do the safe init (in try except). Also add inited module in sys.modules as imported (module name = file name without extension). - + You can init CP in async way! .. code-block:: python # USAGE VAR 1 (without the def auto call) @@ -580,14 +598,14 @@ def OrchestratorPySearchInit(inGlobPatternStr, inDefStr = None, inDefArgNameGSet :param inGlobPatternStr: example"..\\*\\*\\*X64*.cmd" :param inDefStr: OPTIONAL The string name of the def. For backward compatibility if you need to auto call some def from initialized module :param inDefArgNameGSettingsStr: OPTIONAL The name of the GSettings argument in def (if exists) + :param inAsyncInitBool: OPTIONAL True - init py modules in many threads - parallel execution. False (default) - sequence execution :return: { "ModuleNameStr":{"PyPathStr": "", "Module": ...}, ...} """ - lResultDict = {} - lPyPathStrList = glob.glob(inGlobPatternStr) # get the file list - lL = OrchestratorLoggerGet() # get the logger - for lPyPathItemStr in lPyPathStrList: + # # # # # # # # + def __execute__(inResultDict, inPyPathItemStr, inDefStr = None, inDefArgNameGSettingsStr = None): try: + lPyPathItemStr = inPyPathItemStr lModuleNameStr = os.path.basename(lPyPathItemStr)[0:-3] lTechSpecification = importlib.util.spec_from_file_location(lModuleNameStr, lPyPathItemStr) lTechModuleFromSpec = importlib.util.module_from_spec(lTechSpecification) @@ -595,7 +613,7 @@ def OrchestratorPySearchInit(inGlobPatternStr, inDefStr = None, inDefArgNameGSet lTechSpecificationModuleLoader = lTechSpecification.loader.exec_module(lTechModuleFromSpec) lItemDict = {"ModuleNameStr": lModuleNameStr, "PyPathStr": lPyPathItemStr, "Module": lTechModuleFromSpec} if lL: lL.info(f"Py module {lModuleNameStr} has been successfully initialized.") - lResultDict[lModuleNameStr]=lItemDict + inResultDict[lModuleNameStr]=lItemDict # Backward compatibility to call def with gsettings when init if inDefStr is not None and inDefStr is not "": lDef = getattr(lTechModuleFromSpec, inDefStr) @@ -605,6 +623,22 @@ def OrchestratorPySearchInit(inGlobPatternStr, inDefStr = None, inDefArgNameGSet lDef(**lArgDict) except Exception as e: if lL: lL.exception(f"Exception when init the .py file {os.path.abspath(lPyPathItemStr)}") + # # # # # # # # + + lResultDict = {} + + lPyPathStrList = glob.glob(inGlobPatternStr) # get the file list + lL = OrchestratorLoggerGet() # get the logger + for lPyPathItemStr in lPyPathStrList: + if inAsyncInitBool == True: + # ASYNC EXECUTION + lThreadInit = threading.Thread(target=__execute__,kwargs={ + "inResultDict":lResultDict, "inPyPathItemStr": lPyPathItemStr, + "inDefStr": inDefStr, "inDefArgNameGSettingsStr": inDefArgNameGSettingsStr}, daemon=True) + lThreadInit.start() + else: + # SYNC EXECUTION + __execute__(inResultDict=lResultDict, inPyPathItemStr=lPyPathItemStr, inDefStr = inDefStr, inDefArgNameGSettingsStr = inDefArgNameGSettingsStr) return lResultDict def OrchestratorSessionSave(inGSettings=None): @@ -2015,6 +2049,7 @@ def RDPTemplateCreate(inLoginStr, inPasswordStr, inHostStr="127.0.0.1", inPortIn """ if inSharedDriveList is None: inSharedDriveList = ["c"] + if inPortInt is None: inPortInt = 3389 lRDPTemplateDict= { # Init the configuration item "Host": inHostStr, # Host address, example "77.77.22.22" "Port": str(inPortInt), # RDP Port, example "3389" @@ -2573,7 +2608,6 @@ def GSettingsAutocleaner(inGSettings=None): while True: time.sleep(inGSettings["Autocleaner"]["IntervalSecFloat"]) # Wait for the next iteration lL = inGSettings["Logger"] - if lL: lL.info(f"Autocleaner is running") # Info lNowDatetime = datetime.datetime.now() # Get now time # Clean old items in Client > Session > TechnicalSessionGUIDCache lTechnicalSessionGUIDCacheNew = {}