From cbc4b42180f883ea0d7c725042b23fcf8faf6b03 Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Sun, 6 Dec 2020 12:25:20 +0300 Subject: [PATCH] - Orch web: Fix eror in serverDataRender when error render - Agent: Add CMD to kill agent because it is in background mode - Orch: Add handler to set connection when Agent listen orch (/pyOpenRPA/Agent/O2A) - Orch start processor Dict in own thread (Processor.ProcessorRunSync(inGSettings)) - Agent: Create Processor in Agent similarly to Orchestrator (pyOpenRPA.Agent.Processor == pyOpenRPA.Orchestrator.Processor) --- Agent/AgentDaemonKill.cmd | 3 + Agent/AgentSettings.py | 17 +++++- Sources/pyOpenRPA/Agent/Agent.py | 9 ++- Sources/pyOpenRPA/Agent/O2A.py | 13 +++- Sources/pyOpenRPA/Agent/Processor.py | 61 +++++++++++++++++++ .../pyOpenRPA/Orchestrator/Orchestrator.py | 7 +++ Sources/pyOpenRPA/Orchestrator/Processor.py | 10 +-- .../pyOpenRPA/Orchestrator/ServerSettings.py | 43 ++++++++++++- .../Orchestrator/SettingsTemplate.py | 3 + Sources/pyOpenRPA/Orchestrator/Web/Index.js | 5 +- .../pyOpenRPA/Orchestrator/Web/Index.xhtml | 27 +++----- changelog.md | 5 ++ 12 files changed, 175 insertions(+), 28 deletions(-) create mode 100644 Agent/AgentDaemonKill.cmd create mode 100644 Sources/pyOpenRPA/Agent/Processor.py diff --git a/Agent/AgentDaemonKill.cmd b/Agent/AgentDaemonKill.cmd new file mode 100644 index 00000000..22b20308 --- /dev/null +++ b/Agent/AgentDaemonKill.cmd @@ -0,0 +1,3 @@ +cd %~dp0 +taskkill /F /FI "USERNAME eq %username%" /IM pyOpenRPA_Agent.exe +pause >nul \ No newline at end of file diff --git a/Agent/AgentSettings.py b/Agent/AgentSettings.py index 73aefed5..b1efad79 100644 --- a/Agent/AgentSettings.py +++ b/Agent/AgentSettings.py @@ -28,7 +28,22 @@ if __name__ == "__main__": # New init way "AgentDict": { # Will be filled automatically "HostNameUpperStr":None, # Machine hostname "UserUpperStr": None, # Username string - } + }, + "ProcessorDict": { # Has been changed. New general processor (one threaded) v.1.2.0 + "ActivityList": [ # List of the activities + # { + # "Def":"DefAliasTest", # def link or def alias (look gSettings["Processor"]["AliasDefDict"]) + # "ArgList":[1,2,3], # Args list + # "ArgDict":{"ttt":1,"222":2,"dsd":3} # Args dictionary + # "ArgGSettings": # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # "ArgLogger": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # }, + ], + "AliasDefDict": {}, # Storage for def with Str alias. To use it see pyOpenRPA.Orchestrator.ControlPanel + "CheckIntervalSecFloat": 1.0, # Interval for check gSettings in ProcessorDict > ActivityList + "ExecuteBool": True, # Flag to execute thread processor + "ThreadIdInt": None # Technical field - will be setup when processor init + }, } if not os.path.exists("Reports"): diff --git a/Sources/pyOpenRPA/Agent/Agent.py b/Sources/pyOpenRPA/Agent/Agent.py index a67980f4..80b055f8 100644 --- a/Sources/pyOpenRPA/Agent/Agent.py +++ b/Sources/pyOpenRPA/Agent/Agent.py @@ -1,12 +1,19 @@ import threading, socket, getpass from . import O2A, A2O # Data flow Orchestrator To Agent - +from . import Processor # Processor Queue # Main def def Agent(inGSettings): + lL = inGSettings["Logger"] # Detect Machine host name and username inGSettings["AgentDict"]["HostNameUpperStr"] = socket.gethostname().upper() inGSettings["AgentDict"]["UserUpperStr"] = getpass.getuser().upper() + # Processor thread + lProcessorThread = threading.Thread(target= Processor.ProcessorRunSync, kwargs={"inGSettings":inGSettings}) + lProcessorThread.daemon = True # Run the thread in daemon mode. + lProcessorThread.start() # Start the thread execution. + if lL: lL.info("Processor has been started (ProcessorDict)") #Logging + # Start thread to wait data from Orchestrator (O2A) lO2AThread = threading.Thread(target=O2A.O2A_Loop, kwargs={"inGSettings":inGSettings}) lO2AThread.start() diff --git a/Sources/pyOpenRPA/Agent/O2A.py b/Sources/pyOpenRPA/Agent/O2A.py index fdc772ae..5ab71cbe 100644 --- a/Sources/pyOpenRPA/Agent/O2A.py +++ b/Sources/pyOpenRPA/Agent/O2A.py @@ -5,7 +5,14 @@ import requests, time # Request BODY: # { "HostNameUpperStr": "", "UserUpperStr": "" } # Response BODY: -# {} +# QUEUE ITEM +# # { +# # "Def":"DefAliasTest", # def link or def alias (look gSettings["Processor"]["AliasDefDict"]) +# # "ArgList":[1,2,3], # Args list +# # "ArgDict":{"ttt":1,"222":2,"dsd":3} # Args dictionary +# # "ArgGSettings": # Name of GSettings attribute: str (ArgDict) or index (for ArgList) +# # "ArgLogger": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) +# # }, def O2A_Loop(inGSettings): lL = inGSettings["Logger"] @@ -21,6 +28,10 @@ def O2A_Loop(inGSettings): if lResponse.status_code != 200: if lL: lL.warning(f"Agent can not connect to Orchestrator. Below the response from the orchestrator:{lResponse}") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) + else: + lQueueItem = lResponse.json() # Try to get JSON + # Append QUEUE item in ProcessorDict > ActivityList + inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) except Exception as e: if lL: lL.exception(f"A2O Error handler. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) \ No newline at end of file diff --git a/Sources/pyOpenRPA/Agent/Processor.py b/Sources/pyOpenRPA/Agent/Processor.py new file mode 100644 index 00000000..c254c0f6 --- /dev/null +++ b/Sources/pyOpenRPA/Agent/Processor.py @@ -0,0 +1,61 @@ +# 1.2.0 - general processor - contains old orchestrator processor + RDPActive processor +import time, copy, threading +# Run processor synchronious +def ProcessorRunSync(inGSettings): + """ + "ProcessorDict": { # Has been changed. New general processor (one threaded) v.1.2.0 + "ActivityList": [ # List of the activities + # { + # "Def":"DefAliasTest", # def link or def alias (look gSettings["Processor"]["AliasDefDict"]) + # "ArgList":[1,2,3], # Args list + # "ArgDict":{"ttt":1,"222":2,"dsd":3}, # Args dictionary + # "ArgGSettings": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # "ArgLogger": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + # }, + ], + "AliasDefDict": {}, # Storage for def with Str alias. To use it see pyOpenRPA.Orchestrator.ControlPanel + "CheckIntervalSecFloat": 1.0 # Interval for check gSettings in ProcessorDict > ActivityList + "ExecuteBool": True # Flag to execute thread processor + """ + inGSettings["ProcessorDict"]["ThreadIdInt"] = threading.get_ident() # fill Processor thread id + while inGSettings["ProcessorDict"]["ExecuteBool"]: + lActivityItem = inGSettings["ProcessorDict"]["ActivityList"].pop(0, None) # Extract the first item from processor queue + while lActivityItem is not None: + ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item + time.sleep(inGSettings["ProcessorDict"]["CheckIntervalSecFloat"]) # Sleep when list is empty + +# Execute ActivityItem list +# return the def result +def ActivityListExecute(inGSettings, inActivityList): + lL = inGSettings["Logger"] # Logger alias + lResultList = [] # init the result list + for lActivityItem in inActivityList: # Iterate throught the activity list + lDef = None # Def variable + if callable(lActivityItem["Def"]): # CHeck if def is callable + lDef = lActivityItem["Def"] # Get the def + else: # Is not callable - check alias + lDef = inGSettings["ProcessorDict"]["AliasDefDict"].get(lActivityItem["Def"], None) # get def if def key in Alias def storage + #gSettings + lGSettingsDictKey = lActivityItem.pop("ArgGSettings",None) + # # Prepare arg dict - gSettings + if type(lGSettingsDictKey) is str: # check if gSetting key is in ArgDict + lActivityItem["ArgDict"][lGSettingsDictKey] = inGSettings # Set the gSettings in dict + # # Prepare arg list + elif type(lGSettingsDictKey) is int: # check if gSetting key is in ArgDict + lActivityItem["ArgList"].insert(lGSettingsDictKey,inGSettings)# Set the gSettings in list by the index + #Logger + lLoggerDictKey = lActivityItem.pop("ArgLogger",None) + # # Prepare arg dict - gSettings + if type(lLoggerDictKey) is str: # check if gSetting key is in ArgDict + lActivityItem["ArgDict"][lLoggerDictKey] = lL # Set the lLogger in dict + # # Prepare arg list + elif type(lLoggerDictKey) is int: # check if gSetting key is in ArgDict + lActivityItem["ArgList"].insert(lLoggerDictKey,lL)# Set the lLogger in list by the index + + try: # try to run function from Processor.py + lActivityItemResult = lDef(*lActivityItem["ArgList"], **lActivityItem["ArgDict"]) + lResultList.append(lActivityItemResult) # return the result + except Exception as e: + if lL: lL.exception(f"Processor.ActivityListExecute: Exception in def execution - activity will be ignored. Activity item: {lActivityItem}") # Logging + lResultList.append(e) # return the generated exception + return lResultList # return the result list \ No newline at end of file diff --git a/Sources/pyOpenRPA/Orchestrator/Orchestrator.py b/Sources/pyOpenRPA/Orchestrator/Orchestrator.py index 9b0902a2..ba23cce1 100644 --- a/Sources/pyOpenRPA/Orchestrator/Orchestrator.py +++ b/Sources/pyOpenRPA/Orchestrator/Orchestrator.py @@ -657,6 +657,13 @@ def Orchestrator(inGSettings): for lActivityItem in gSettingsDict["OrchestratorStart"]["ActivityList"]: Processor.ActivityListOrDict(lActivityItem) + # Processor thread + lProcessorThread = threading.Thread(target= Processor.ProcessorRunSync, kwargs={"inGSettings":gSettingsDict}) + lProcessorThread.daemon = True # Run the thread in daemon mode. + lProcessorThread.start() # Start the thread execution. + if lL: lL.info("Processor has been started (ProcessorDict)") #Logging + + if lL: lL.info("Scheduler loop start") #Logging gDaemonActivityLogDictRefreshSecInt = 10 # The second period for clear lDaemonActivityLogDict from old items gDaemonActivityLogDictLastTime = time.time() # The second perioad for clean lDaemonActivityLogDict from old items diff --git a/Sources/pyOpenRPA/Orchestrator/Processor.py b/Sources/pyOpenRPA/Orchestrator/Processor.py index c254c0f6..6d7207e6 100644 --- a/Sources/pyOpenRPA/Orchestrator/Processor.py +++ b/Sources/pyOpenRPA/Orchestrator/Processor.py @@ -19,10 +19,12 @@ def ProcessorRunSync(inGSettings): """ inGSettings["ProcessorDict"]["ThreadIdInt"] = threading.get_ident() # fill Processor thread id while inGSettings["ProcessorDict"]["ExecuteBool"]: - lActivityItem = inGSettings["ProcessorDict"]["ActivityList"].pop(0, None) # Extract the first item from processor queue - while lActivityItem is not None: - ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item - time.sleep(inGSettings["ProcessorDict"]["CheckIntervalSecFloat"]) # Sleep when list is empty + lActivityList = inGSettings["ProcessorDict"]["ActivityList"] # Alias + if len(lActivityList)>0: + lActivityItem = inGSettings["ProcessorDict"]["ActivityList"].pop(0) # Extract the first item from processor queue + while lActivityItem is not None: + ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item + time.sleep(inGSettings["ProcessorDict"]["CheckIntervalSecFloat"]) # Sleep when list is empty # Execute ActivityItem list # return the def result diff --git a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py index 7532822e..fb264431 100644 --- a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py +++ b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py @@ -13,6 +13,7 @@ import threading # Multi-threading from .Web import Basic from . import BackwardCompatibility # Support old up to 1.2.0 defs from . import Processor +from . import SettingsTemplate # # # # # # # # # # # # # v 1.2.0 Functionallity # # # # # # # # # # # # @@ -211,9 +212,46 @@ def pyOpenRPA_ActivityListExecute(inRequest, inGSettings): # See docs in Agent (pyOpenRPA.Agent.O2A) def pyOpenRPA_Agent_O2A(inRequest, inGSettings): + lConnectionLifetimeSecFloat = 5.0 # 60 min * 60 sec + lTimeStartFloat = time.time() + # Recieve the data + lValueStr = None + if inRequest.headers.get('Content-Length') is not None: + lInputByteArrayLength = int(inRequest.headers.get('Content-Length')) + lInputByteArray = inRequest.rfile.read(lInputByteArrayLength) + # Превращение массива байт в объект + lInput = json.loads(lInputByteArray.decode('utf8')) + # Check if item is created + lAgentDictItemKeyTurple = (lInput["HostNameUpperStr"],lInput["UserUpperStr"]) + if lAgentDictItemKeyTurple not in inGSettings["AgentDict"]: + inGSettings["AgentDict"][lAgentDictItemKeyTurple] = SettingsTemplate.__AgentDictItemCreate__() + lThisAgentDict = inGSettings["AgentDict"][lAgentDictItemKeyTurple] + lThisAgentDict["IsListenBool"]=True # Set is online + lThisAgentDict["ConnectionCountInt"] += 1 # increment connection count # Test solution - while True: - time.sleep(3) + lDoLoopBool = True + while lDoLoopBool: + # Check if lifetime is over + if time.time() - lTimeStartFloat > lConnectionLifetimeSecFloat: # Lifetime is over + lThisAgentDict["IsListenBool"] = False # Set is offline + lThisAgentDict["ConnectionCountInt"] -= 1 # decrement connection count + lDoLoopBool = False + else: # Lifetime is good - do alg + lThisAgentDict["IsListenBool"] = True # Set is online + lQueueList = lThisAgentDict["QueueList"] + if len(lQueueList)>0:# Do some operations if has queue items + if lThisAgentDict["ConnectionCountInt"] == lThisAgentDict["ConnectionFirstQueueItemCountInt"] - 1: + # POP QUEUE ITEM CONDITION ConnectionCountInt == ConnectionFirstQueueItemCountInt - 1 + lQueueItem = lThisAgentDict["QueueList"].pop(0) + lThisAgentDict["ConnectionFirstQueueItemCountInt"] = 0 + else: + lQueueItem = lThisAgentDict["QueueList"][0] + lThisAgentDict["ConnectionFirstQueueItemCountInt"] += 1 + # Send QUEUE ITEM + inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lQueueItem), "utf8") + lThisAgentDict["ConnectionCountInt"] -= 1 # Connection go to be closed - decrement the connection count + else: # no queue item - sleep for the next iteration + time.sleep(1) # See docs in Agent (pyOpenRPA.Agent.A2O) def pyOpenRPA_Agent_A2O(inRequest, inGSettings): @@ -222,7 +260,6 @@ def pyOpenRPA_Agent_A2O(inRequest, inGSettings): if inRequest.headers.get('Content-Length') is not None: lInputByteArrayLength = int(inRequest.headers.get('Content-Length')) lInputByteArray = inRequest.rfile.read(lInputByteArrayLength) - print(lInputByteArray) # Превращение массива байт в объект lInput = json.loads(lInputByteArray.decode('utf8')) if "LogList" in lInput: diff --git a/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py b/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py index 498e1653..d04b7b4c 100644 --- a/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py +++ b/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py @@ -295,6 +295,9 @@ def __Create__(): } # Create full configuration for +def __AgentDictItemCreate__(): + return {"IsListenBool":False, "ConnectionCountInt":0, "ConnectionFirstQueueItemCountInt":0, "QueueList":[]} +# Create full configuration for def __UACClientAdminCreate__(): lResultDict = { "pyOpenRPADict":{ diff --git a/Sources/pyOpenRPA/Orchestrator/Web/Index.js b/Sources/pyOpenRPA/Orchestrator/Web/Index.js index 7680a0ed..3a3f22fc 100644 --- a/Sources/pyOpenRPA/Orchestrator/Web/Index.js +++ b/Sources/pyOpenRPA/Orchestrator/Web/Index.js @@ -415,10 +415,13 @@ $(document).ready(function() { mGlobal.pyOpenRPA.ServerDataHashStr = lResponseJSON["HashStr"] mGlobal.pyOpenRPA.ServerDataRefreshDef_TechnicalRender() mGlobal.UserRoleUpdate(); + mGlobal.pyOpenRPA.ServerDataRefreshDef() // Go to the next call } catch(error) { + console.log(error) + setTimeout(mGlobal.pyOpenRPA.ServerDataRefreshDef,3000) } - mGlobal.pyOpenRPA.ServerDataRefreshDef() // recursive + //mGlobal.pyOpenRPA.ServerDataRefreshDef() // recursive }, dataType: "text", error: function(jqXHR, textStatus, errorThrown ) { diff --git a/Sources/pyOpenRPA/Orchestrator/Web/Index.xhtml b/Sources/pyOpenRPA/Orchestrator/Web/Index.xhtml index 1ac191c9..ac92dc76 100644 --- a/Sources/pyOpenRPA/Orchestrator/Web/Index.xhtml +++ b/Sources/pyOpenRPA/Orchestrator/Web/Index.xhtml @@ -59,7 +59,8 @@             -

Orchestrator Web GUI

+     +

ORCHESTRATOR WEB GUI

@@ -199,7 +200,7 @@
@@ -239,32 +240,24 @@
diff --git a/changelog.md b/changelog.md index 919672b8..b9933660 100644 --- a/changelog.md +++ b/changelog.md @@ -53,6 +53,11 @@ - Create Agent support in Orchestrator (/pyOpenRPA/Agent/O2A and /pyOpenRPA/Agent/A2O) - Orch: /pyOpenRPA/ServerData - add sub dict "AgentDict" - Orch WEB: Create Agent render +- Orch web: Fix eror in serverDataRender when error render +- Agent: Add CMD to kill agent because it is in background mode +- Orch: Add handler to set connection when Agent listen orch (/pyOpenRPA/Agent/O2A) +- Orch start processor Dict in own thread (Processor.ProcessorRunSync(inGSettings)) +- Agent: Create Processor in Agent similarly to Orchestrator (pyOpenRPA.Agent.Processor == pyOpenRPA.Orchestrator.Processor) [1.1.0] After 2 month test prefinal with new improovements (+RobotRDPActive in Orchestrator + Easy ControlPanelTemplate) Beta before 1.1.0 (new way of OpenRPA with improvements. Sorry, but no backward compatibility)/ Backward compatibility will start from 1.0.1