From cd3e3211ff29c4d181851b2a3e9908be848db316 Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Thu, 23 Dec 2021 15:50:38 +0300 Subject: [PATCH] draft 1.2.7 (need test) # Add "ThreadBool" flag in ActivityItem - run in async mode some activity --- Orchestrator/OrchestratorSettings.py | 4 +-- Sources/pyOpenRPA/Agent/O2A.py | 7 ++++- Sources/pyOpenRPA/Agent/Processor.py | 30 +++++++++++++++++++ .../pyOpenRPA/Orchestrator/ServerSettings.py | 27 ++++++++++++++--- .../Orchestrator/__Orchestrator__.py | 6 ++-- 5 files changed, 65 insertions(+), 9 deletions(-) diff --git a/Orchestrator/OrchestratorSettings.py b/Orchestrator/OrchestratorSettings.py index 304f500c..05a19629 100644 --- a/Orchestrator/OrchestratorSettings.py +++ b/Orchestrator/OrchestratorSettings.py @@ -14,8 +14,8 @@ if not Orchestrator.OrchestratorIsAdmin(): Orchestrator.OrchestratorRerunAsAdmin() print(f"Orchestrator will be run as administrator!") elif __name__ == "__main__": # New init way - allow run as module -m PyOpenRPA.Orchestrator - #gSettings = Orchestrator.GSettingsGet() - gSettings = SettingsTemplate.Create(inModeStr="BASIC") # Create GSettings with basic configuration - no more config is available from the box - you can create own + gSettings = Orchestrator.GSettingsGet() + #gSettings = SettingsTemplate.Create(inModeStr="BASIC") # Create GSettings with basic configuration - no more config is available from the box - you can create own # TEST Add User ND - Add Login ND to superuser of the Orchestrator lUACClientDict = SettingsTemplate.__UACClientAdminCreate__() diff --git a/Sources/pyOpenRPA/Agent/O2A.py b/Sources/pyOpenRPA/Agent/O2A.py index 0994a0c0..f416c462 100644 --- a/Sources/pyOpenRPA/Agent/O2A.py +++ b/Sources/pyOpenRPA/Agent/O2A.py @@ -1,4 +1,5 @@ import requests, time, json +from . import Processor # O2A - Data flow Orchestrator to Agent # f"{lProtocolStr}://{lHostStr}:{lPortInt}/pyOpenRPA/Agent/O2A" @@ -37,7 +38,11 @@ def O2A_Loop(inGSettings): lQueueItem = lResponse.json() # Try to get JSON # Append QUEUE item in ProcessorDict > ActivityList lActivityLastGUIDStr = lQueueItem["GUIDStr"] - inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) + # Check if ActivityItem ["ThreadBool"] = False > go sync mode in processor queue; Else: New thread + if lQueueItem.get("ThreadBool",False) == False: + inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) + else: + Processor.ProcessorRunAsync(inGSettings=inGSettings,inActivityList=[lQueueItem]) # Log full version if bytes size is less than limit . else short lAgentLimitLogSizeBytesInt = 500 if lBodyLenInt <= lAgentLimitLogSizeBytesInt: diff --git a/Sources/pyOpenRPA/Agent/Processor.py b/Sources/pyOpenRPA/Agent/Processor.py index a96a973f..0fe9a965 100644 --- a/Sources/pyOpenRPA/Agent/Processor.py +++ b/Sources/pyOpenRPA/Agent/Processor.py @@ -13,6 +13,7 @@ def ProcessorRunSync(inGSettings): # "ArgGSettings": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) # "ArgLogger": None, # Name of GSettings attribute: str (ArgDict) or index (for ArgList) # "GUIDStr": "sadasd-asdas-d-asdasd", # ActivityItem GUID which identify the Activity + # "ThreadBool": False # }, ], "AliasDefDict": {}, # Storage for def with Str alias. To use it see pyOpenRPA.Orchestrator.ControlPanel @@ -36,6 +37,35 @@ def ProcessorRunSync(inGSettings): else: time.sleep(inGSettings["ProcessorDict"]["CheckIntervalSecFloat"]) # Sleep when list is empty +# Run processor Async +def ProcessorRunAsync(inGSettings, inActivityList): + """ + "inActivityList": [ # List of the activities + # { + # "Def":"DefAliasTest", # def link or def alias (look gSettings["Processor"]["AliasDefDict"]) + # "ArgList":[1,2,3], # Args list + # "ArgDict":{"ttt":1,"222":2,"dsd":3}, # Args dictionary + # "ArgGSettings": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # "ArgLogger": None, # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # "GUIDStr": "sadasd-asdas-d-asdasd", # ActivityItem GUID which identify the Activity + # "ThreadBool": True + # }, + """ + def __process__(inGSettings, inActivityList): + for lActivityItem in inActivityList: + lL = inGSettings["Logger"] # Logger alias + if lL: lL.debug(f'ActivityItem in new thread') + lResultList = ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item + #Some help code + if len(lResultList) == 0: lResultList= [None] + # Send result to Orc if we have GUIDStr + if "GUIDStr" in lActivityItem: + # Def to send to Orc + A2O.ActivityReturnDictSend(inGSettings=inGSettings, inActivityItemGUIDStr=lActivityItem["GUIDStr"],inReturn=lResultList[0]) + # Start in new thread + lThread = threading.Thread(target=__process__,kwargs={"inGSettings": inGSettings, "inActivityList": inActivityList}) + lThread.start() + # Execute ActivityItem list # return the def result def ActivityListExecute(inGSettings, inActivityList): diff --git a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py index ed09b85f..803e735a 100644 --- a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py +++ b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py @@ -281,8 +281,22 @@ def pyOpenRPA_Processor(inRequest, inGSettings): lActivityTypeListStr = "Has some error with Activity Type read" lWebAuditMessageStr = __Orchestrator__.WebAuditMessageCreate(inRequest=inRequest,inOperationCodeStr=lActivityTypeListStr, inMessageStr="pyOpenRPA_Processor") if lL: lL.info(lWebAuditMessageStr) - # Append in list - inGSettings["ProcessorDict"]["ActivityList"]+=lInput + # Separate into 2 lists - sync and async + lSyncActvityList = [] + lAsyncActivityList = [] + for lActivityItem in lInput: + if lInput.get("ThreadBool", False) == False: + lSyncActvityList.append(lActivityItem) + else: + lAsyncActivityList.append(lActivityItem) + # Sync: Append in list + inGSettings["ProcessorDict"]["ActivityList"]+=lSyncActvityList + # Async: go to run + if len(lAsyncActivityList)>0: + for lActivityItem in lAsyncActivityList: + lActivityItemArgsDict = {"inGSettings":inGSettings,"inActivityList":[lActivityItem]} + lThread = threading.Thread(target=Processor.ActivityListExecute, kwargs=lActivityItemArgsDict) + lThread.start() else: # Logging info about processor activity if not SuperToken () if not __Orchestrator__.WebUserIsSuperToken(inRequest=inRequest, inGSettings=inGSettings): @@ -293,8 +307,13 @@ def pyOpenRPA_Processor(inRequest, inGSettings): lActivityTypeListStr = "Has some error with Activity Type read" lWebAuditMessageStr = __Orchestrator__.WebAuditMessageCreate(inRequest=inRequest,inOperationCodeStr=lActivityTypeListStr, inMessageStr="pyOpenRPA_Processor") if lL: lL.info(lWebAuditMessageStr) - # Append in list - inGSettings["ProcessorDict"]["ActivityList"].append(lInput) + if lInput.get("ThreadBool",False) == False: + # Append in list + inGSettings["ProcessorDict"]["ActivityList"].append(lInput) + else: + lActivityItemArgsDict = {"inGSettings": inGSettings, "inActivityList": [lInput]} + lThread = threading.Thread(target=Processor.ActivityListExecute, kwargs=lActivityItemArgsDict) + lThread.start() # Execute activity list def pyOpenRPA_ActivityListExecute(inRequest, inGSettings): # Recieve the data diff --git a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py index f3a7a551..06c63bfd 100644 --- a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py +++ b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py @@ -1152,7 +1152,7 @@ def ProcessorAliasDefUpdate(inDef, inAliasStr, inGSettings = None): else: raise Exception(f"pyOpenRPA Exception: You can't use Orchestrator.ProcessorAliasDefUpdate with arg 'inDef' string value. inDef is '{inDef}', inAliasStr is '{inAliasStr}'") return inAliasStr -def ProcessorActivityItemCreate(inDef, inArgList=None, inArgDict=None, inArgGSettingsStr=None, inArgLoggerStr=None, inGUIDStr = None): +def ProcessorActivityItemCreate(inDef, inArgList=None, inArgDict=None, inArgGSettingsStr=None, inArgLoggerStr=None, inGUIDStr = None, inThreadBool = False): """ Create activity item. Activity item can be used as list item in ProcessorActivityItemAppend or in Processor.ActivityListExecute. @@ -1207,6 +1207,7 @@ def ProcessorActivityItemCreate(inDef, inArgList=None, inArgDict=None, inArgGSet :param inArgGSettingsStr: Name of def argument of the GSettings dict :param inArgLoggerStr: Name of def argument of the logging object :param inGUIDStr: GUID which you can specify. If None the GUID will be generated + :param inThreadBool: True - execute ActivityItem in new thread; False - in processor thread :return: {} """ # Work about GUID in Activity items @@ -1220,7 +1221,8 @@ def ProcessorActivityItemCreate(inDef, inArgList=None, inArgDict=None, inArgGSet "ArgDict":inArgDict, # Args dictionary "ArgGSettings": inArgGSettingsStr, # Name of GSettings attribute: str (ArgDict) or index (for ArgList) "ArgLogger": inArgLoggerStr, # Name of GSettings attribute: str (ArgDict) or index (for ArgList) - "GUIDStr": inGUIDStr + "GUIDStr": inGUIDStr, + "ThreadBool": inThreadBool } return lActivityItemDict