From 22c4cae9ba9a6542578de73dee3a00731b90758c Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Mon, 12 Apr 2021 17:46:17 +0300 Subject: [PATCH] Fix in Agent <> Orchestrator interaction - change connection technology - need update + restart Orc and Agent --- Sources/pyOpenRPA/Agent/O2A.py | 13 ++-- .../pyOpenRPA/Orchestrator/ServerSettings.py | 72 ++++++++++++------- .../Orchestrator/__Orchestrator__.py | 12 +++- 3 files changed, 65 insertions(+), 32 deletions(-) diff --git a/Sources/pyOpenRPA/Agent/O2A.py b/Sources/pyOpenRPA/Agent/O2A.py index 082d7827..68aa09c8 100644 --- a/Sources/pyOpenRPA/Agent/O2A.py +++ b/Sources/pyOpenRPA/Agent/O2A.py @@ -16,6 +16,7 @@ import requests, time def O2A_Loop(inGSettings): lL = inGSettings["Logger"] + lActivityLastGUIDStr = "" # Init empty ActivityLastGUIDStr while inGSettings["O2ADict"]["IsOnlineBool"]: # Send request to the orchestrator server try: @@ -23,7 +24,7 @@ def O2A_Loop(inGSettings): lHostStr = inGSettings["OrchestratorDict"]["HostStr"] lPortInt = inGSettings["OrchestratorDict"]["PortInt"] lURLStr=f"{lProtocolStr}://{lHostStr}:{lPortInt}/pyOpenRPA/Agent/O2A" - lDataDict = { "HostNameUpperStr": inGSettings["AgentDict"]["HostNameUpperStr"], "UserUpperStr": inGSettings["AgentDict"]["UserUpperStr"]} + lDataDict = { "HostNameUpperStr": inGSettings["AgentDict"]["HostNameUpperStr"], "UserUpperStr": inGSettings["AgentDict"]["UserUpperStr"], "ActivityLastGUIDStr": lActivityLastGUIDStr} lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=lDataDict) if lResponse.status_code != 200: if lL: lL.warning(f"Agent can not connect to Orchestrator. Below the response from the orchestrator:{lResponse}") @@ -31,11 +32,15 @@ def O2A_Loop(inGSettings): else: lQueueItem = lResponse.json() # Try to get JSON # Append QUEUE item in ProcessorDict > ActivityList + lActivityLastGUIDStr = lQueueItem["GUIDStr"] inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) - if lL: lL.debug(f"ActivityItem was recieved from orchestrator: {lQueueItem}"); + if lL: lL.debug(f"ActivityItem was received from orchestrator: {lQueueItem}"); except Exception as e: - if lL: lL.exception(f"A2O Error handler. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") + if lL: lL.exception(f"O2A Error handler. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) except requests.exceptions.ConnectionError as e: - if lL: lL.error(f"A2O Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") + if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") + time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) + except ConnectionResetError as e: + if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) \ No newline at end of file diff --git a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py index be5c1532..6610118c 100644 --- a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py +++ b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py @@ -330,7 +330,8 @@ def pyOpenRPA_ActivityListExecute(inRequest, inGSettings): # See docs in Agent (pyOpenRPA.Agent.O2A) def pyOpenRPA_Agent_O2A(inRequest, inGSettings): lL = inGSettings["Logger"] # Alias - lConnectionLifetimeSecFloat = 3600.0 # 60 min * 60 sec 3600.0 + lConnectionLifetimeSecFloat = 300.0 # 5 min * 60 sec 300.0 + lActivityItemLifetimeLimitSecFloat = 30.0 # # 30 seconds lTimeStartFloat = time.time() # Recieve the data lValueStr = None @@ -348,32 +349,51 @@ def pyOpenRPA_Agent_O2A(inRequest, inGSettings): lThisAgentDict["ConnectionCountInt"] += 1 # increment connection count # Test solution lDoLoopBool = True - while lDoLoopBool: - # Check if lifetime is over - if time.time() - lTimeStartFloat > lConnectionLifetimeSecFloat: # Lifetime is over - lThisAgentDict["IsListenBool"] = False # Set is offline - lDoLoopBool = False - else: # Lifetime is good - do alg - lThisAgentDict["IsListenBool"] = True # Set is online - lQueueList = lThisAgentDict["ActivityList"] - if len(lQueueList)>0:# Do some operations if has queue items - if lL: lL.debug(f'O2A BEFORE: ConnectionCountInt: {lThisAgentDict["ConnectionCountInt"]};ConnectionFirstQueueItemCountInt {lThisAgentDict["ConnectionFirstQueueItemCountInt"]}') - if lThisAgentDict["ConnectionCountInt"] == lThisAgentDict["ConnectionFirstQueueItemCountInt"] + 1: - # POP QUEUE ITEM CONDITION ConnectionCountInt == ConnectionFirstQueueItemCountInt + 1 - lActivityItem = lThisAgentDict["ActivityList"].pop(0) - lThisAgentDict["ConnectionFirstQueueItemCountInt"] = 0 - if lL: lL.debug(f"Activity was deleted from the list: {lThisAgentDict['ActivityList']}") - else: + try: + while lDoLoopBool: + # Check if lifetime is over + if time.time() - lTimeStartFloat > lConnectionLifetimeSecFloat: # Lifetime is over + lThisAgentDict["IsListenBool"] = False # Set is offline + lDoLoopBool = False + else: # Lifetime is good - do alg + lThisAgentDict["IsListenBool"] = True # Set is online + lQueueList = lThisAgentDict["ActivityList"] + if len(lQueueList)>0:# Do some operations if has queue items + if lL: lL.debug(f'O2A: ConnectionCountInt: {lThisAgentDict["ConnectionCountInt"]};ConnectionFirstQueueItemCountInt {lThisAgentDict["ConnectionFirstQueueItemCountInt"]}') + # check if delta datetime is < than ActivityLifeTimeSecFloat lActivityItem = lThisAgentDict["ActivityList"][0] - lThisAgentDict["ConnectionFirstQueueItemCountInt"] += 1 - if lL: lL.debug(f"Activity was !not! deleted from the list: {lThisAgentDict['ActivityList']}") - if lL: lL.debug(f'O2A AFTER: ConnectionCountInt: {lThisAgentDict["ConnectionCountInt"]};ConnectionFirstQueueItemCountInt {lThisAgentDict["ConnectionFirstQueueItemCountInt"]}') - # Send QUEUE ITEM - if lL: lL.info(f"Activity item to agent Hostname {lInput['HostNameUpperStr']}, User {lInput['UserUpperStr']}. Activity item: {lActivityItem}") - inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lActivityItem), "utf8") - lDoLoopBool = False # CLose the connection - else: # no queue item - sleep for the next iteration - time.sleep(1) + lActivityLifetimeSecFloat = (datetime.datetime.now() - lActivityItem["CreatedByDatetime"]).total_seconds() + # Check case if limit is expired - remove item + if lActivityLifetimeSecFloat > lActivityItemLifetimeLimitSecFloat: + lActivityItem = lThisAgentDict["ActivityList"].pop(0) + else: + lReturnActivityItemDict = None + # If lInput['ActivityLastGUIDStr'] is '' > return 0 element for send in Agent + if lInput['ActivityLastGUIDStr'] == "": + lReturnActivityItemDict = lActivityItem[0] + else: + # go from the end - search element with GUIDStr + lForTriggerGetNextItem = False + for lForActivityItemDict in lQueueList: + if lForTriggerGetNextItem == True: + lReturnActivityItemDict = lForActivityItemDict + break + if lForActivityItemDict['GUIDStr'] == lInput['ActivityLastGUIDStr']: lForTriggerGetNextItem = True + # CASE if GUID is not detected - return 0 element + if lReturnActivityItemDict == None and lForTriggerGetNextItem == False: + lReturnActivityItemDict = lActivityItem[0] + # Send QUEUE ITEM + if lReturnActivityItemDict is not None: + lReturnActivityItemDict = copy.deepcopy(lReturnActivityItemDict) + if "CreatedByDatetime" in lReturnActivityItemDict: + del lReturnActivityItemDict["CreatedByDatetime"] + if lL: lL.info(f"Activity item to agent Hostname {lInput['HostNameUpperStr']}, User {lInput['UserUpperStr']}. Activity item: {lReturnActivityItemDict}") + inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lReturnActivityItemDict), "utf8") + lDoLoopBool = False # CLose the connection + else: # no queue item - sleep for the next iteration + time.sleep(1) + except Exception as e: + pass lThisAgentDict["ConnectionCountInt"] -= 1 # Connection go to be closed - decrement the connection count # See docs in Agent (pyOpenRPA.Agent.A2O) def pyOpenRPA_Agent_A2O(inRequest, inGSettings): diff --git a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py index 5d829121..5383fe52 100644 --- a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py +++ b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py @@ -1,4 +1,4 @@ -import subprocess, json, psutil, time, os, win32security, sys, base64, logging, ctypes #Get input argument +import subprocess, json, psutil, time, os, win32security, sys, base64, logging, ctypes, copy #Get input argument from . import Server from . import Timer from . import Processor @@ -37,11 +37,19 @@ def AgentActivityItemAdd(inGSettings, inHostNameStr, inUserStr, inActivityItemDi :param inActivityItemDict: ActivityItem :return: None """ + lActivityItemDict = copy.deepcopy(inActivityItemDict) + # Add GUIDStr if not exist + if "GUIDStr" not in lActivityItemDict: + lGUIDStr = str(uuid.uuid4()) # generate new GUID + lActivityItemDict["GUIDStr"] = lGUIDStr + # Add CreatedByDatetime + lActivityItemDict["CreatedByDatetime"] = datetime.datetime.now() + # Main alg lAgentDictItemKeyTurple = (inHostNameStr.upper(),inUserStr.upper()) if lAgentDictItemKeyTurple not in inGSettings["AgentDict"]: inGSettings["AgentDict"][lAgentDictItemKeyTurple] = SettingsTemplate.__AgentDictItemCreate__() lThisAgentDict = inGSettings["AgentDict"][lAgentDictItemKeyTurple] - lThisAgentDict["ActivityList"].append(inActivityItemDict) + lThisAgentDict["ActivityList"].append(lActivityItemDict) def AgentOSCMD(inGSettings, inHostNameStr, inUserStr, inCMDStr, inRunAsyncBool=True, inSendOutputToOrchestratorLogsBool=True, inCMDEncodingStr="cp1251"):