Fix in Agent <> Orchestrator interaction - change connection technology - need update + restart Orc and Agent

dev-linux
Ivan Maslov 4 years ago
parent 09f50c830f
commit 22c4cae9ba

@ -16,6 +16,7 @@ import requests, time
def O2A_Loop(inGSettings):
lL = inGSettings["Logger"]
lActivityLastGUIDStr = "" # Init empty ActivityLastGUIDStr
while inGSettings["O2ADict"]["IsOnlineBool"]:
# Send request to the orchestrator server
try:
@ -23,7 +24,7 @@ def O2A_Loop(inGSettings):
lHostStr = inGSettings["OrchestratorDict"]["HostStr"]
lPortInt = inGSettings["OrchestratorDict"]["PortInt"]
lURLStr=f"{lProtocolStr}://{lHostStr}:{lPortInt}/pyOpenRPA/Agent/O2A"
lDataDict = { "HostNameUpperStr": inGSettings["AgentDict"]["HostNameUpperStr"], "UserUpperStr": inGSettings["AgentDict"]["UserUpperStr"]}
lDataDict = { "HostNameUpperStr": inGSettings["AgentDict"]["HostNameUpperStr"], "UserUpperStr": inGSettings["AgentDict"]["UserUpperStr"], "ActivityLastGUIDStr": lActivityLastGUIDStr}
lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=lDataDict)
if lResponse.status_code != 200:
if lL: lL.warning(f"Agent can not connect to Orchestrator. Below the response from the orchestrator:{lResponse}")
@ -31,11 +32,15 @@ def O2A_Loop(inGSettings):
else:
lQueueItem = lResponse.json() # Try to get JSON
# Append QUEUE item in ProcessorDict > ActivityList
lActivityLastGUIDStr = lQueueItem["GUIDStr"]
inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem)
if lL: lL.debug(f"ActivityItem was recieved from orchestrator: {lQueueItem}");
if lL: lL.debug(f"ActivityItem was received from orchestrator: {lQueueItem}");
except Exception as e:
if lL: lL.exception(f"A2O Error handler. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.")
if lL: lL.exception(f"O2A Error handler. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])
except requests.exceptions.ConnectionError as e:
if lL: lL.error(f"A2O Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.")
if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])
except ConnectionResetError as e:
if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])

@ -330,7 +330,8 @@ def pyOpenRPA_ActivityListExecute(inRequest, inGSettings):
# See docs in Agent (pyOpenRPA.Agent.O2A)
def pyOpenRPA_Agent_O2A(inRequest, inGSettings):
lL = inGSettings["Logger"] # Alias
lConnectionLifetimeSecFloat = 3600.0 # 60 min * 60 sec 3600.0
lConnectionLifetimeSecFloat = 300.0 # 5 min * 60 sec 300.0
lActivityItemLifetimeLimitSecFloat = 30.0 # # 30 seconds
lTimeStartFloat = time.time()
# Recieve the data
lValueStr = None
@ -348,6 +349,7 @@ def pyOpenRPA_Agent_O2A(inRequest, inGSettings):
lThisAgentDict["ConnectionCountInt"] += 1 # increment connection count
# Test solution
lDoLoopBool = True
try:
while lDoLoopBool:
# Check if lifetime is over
if time.time() - lTimeStartFloat > lConnectionLifetimeSecFloat: # Lifetime is over
@ -357,23 +359,41 @@ def pyOpenRPA_Agent_O2A(inRequest, inGSettings):
lThisAgentDict["IsListenBool"] = True # Set is online
lQueueList = lThisAgentDict["ActivityList"]
if len(lQueueList)>0:# Do some operations if has queue items
if lL: lL.debug(f'O2A BEFORE: ConnectionCountInt: {lThisAgentDict["ConnectionCountInt"]};ConnectionFirstQueueItemCountInt {lThisAgentDict["ConnectionFirstQueueItemCountInt"]}')
if lThisAgentDict["ConnectionCountInt"] == lThisAgentDict["ConnectionFirstQueueItemCountInt"] + 1:
# POP QUEUE ITEM CONDITION ConnectionCountInt == ConnectionFirstQueueItemCountInt + 1
if lL: lL.debug(f'O2A: ConnectionCountInt: {lThisAgentDict["ConnectionCountInt"]};ConnectionFirstQueueItemCountInt {lThisAgentDict["ConnectionFirstQueueItemCountInt"]}')
# check if delta datetime is < than ActivityLifeTimeSecFloat
lActivityItem = lThisAgentDict["ActivityList"][0]
lActivityLifetimeSecFloat = (datetime.datetime.now() - lActivityItem["CreatedByDatetime"]).total_seconds()
# Check case if limit is expired - remove item
if lActivityLifetimeSecFloat > lActivityItemLifetimeLimitSecFloat:
lActivityItem = lThisAgentDict["ActivityList"].pop(0)
lThisAgentDict["ConnectionFirstQueueItemCountInt"] = 0
if lL: lL.debug(f"Activity was deleted from the list: {lThisAgentDict['ActivityList']}")
else:
lActivityItem = lThisAgentDict["ActivityList"][0]
lThisAgentDict["ConnectionFirstQueueItemCountInt"] += 1
if lL: lL.debug(f"Activity was !not! deleted from the list: {lThisAgentDict['ActivityList']}")
if lL: lL.debug(f'O2A AFTER: ConnectionCountInt: {lThisAgentDict["ConnectionCountInt"]};ConnectionFirstQueueItemCountInt {lThisAgentDict["ConnectionFirstQueueItemCountInt"]}')
lReturnActivityItemDict = None
# If lInput['ActivityLastGUIDStr'] is '' > return 0 element for send in Agent
if lInput['ActivityLastGUIDStr'] == "":
lReturnActivityItemDict = lActivityItem[0]
else:
# go from the end - search element with GUIDStr
lForTriggerGetNextItem = False
for lForActivityItemDict in lQueueList:
if lForTriggerGetNextItem == True:
lReturnActivityItemDict = lForActivityItemDict
break
if lForActivityItemDict['GUIDStr'] == lInput['ActivityLastGUIDStr']: lForTriggerGetNextItem = True
# CASE if GUID is not detected - return 0 element
if lReturnActivityItemDict == None and lForTriggerGetNextItem == False:
lReturnActivityItemDict = lActivityItem[0]
# Send QUEUE ITEM
if lL: lL.info(f"Activity item to agent Hostname {lInput['HostNameUpperStr']}, User {lInput['UserUpperStr']}. Activity item: {lActivityItem}")
inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lActivityItem), "utf8")
if lReturnActivityItemDict is not None:
lReturnActivityItemDict = copy.deepcopy(lReturnActivityItemDict)
if "CreatedByDatetime" in lReturnActivityItemDict:
del lReturnActivityItemDict["CreatedByDatetime"]
if lL: lL.info(f"Activity item to agent Hostname {lInput['HostNameUpperStr']}, User {lInput['UserUpperStr']}. Activity item: {lReturnActivityItemDict}")
inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lReturnActivityItemDict), "utf8")
lDoLoopBool = False # CLose the connection
else: # no queue item - sleep for the next iteration
time.sleep(1)
except Exception as e:
pass
lThisAgentDict["ConnectionCountInt"] -= 1 # Connection go to be closed - decrement the connection count
# See docs in Agent (pyOpenRPA.Agent.A2O)
def pyOpenRPA_Agent_A2O(inRequest, inGSettings):

@ -1,4 +1,4 @@
import subprocess, json, psutil, time, os, win32security, sys, base64, logging, ctypes #Get input argument
import subprocess, json, psutil, time, os, win32security, sys, base64, logging, ctypes, copy #Get input argument
from . import Server
from . import Timer
from . import Processor
@ -37,11 +37,19 @@ def AgentActivityItemAdd(inGSettings, inHostNameStr, inUserStr, inActivityItemDi
:param inActivityItemDict: ActivityItem
:return: None
"""
lActivityItemDict = copy.deepcopy(inActivityItemDict)
# Add GUIDStr if not exist
if "GUIDStr" not in lActivityItemDict:
lGUIDStr = str(uuid.uuid4()) # generate new GUID
lActivityItemDict["GUIDStr"] = lGUIDStr
# Add CreatedByDatetime
lActivityItemDict["CreatedByDatetime"] = datetime.datetime.now()
# Main alg
lAgentDictItemKeyTurple = (inHostNameStr.upper(),inUserStr.upper())
if lAgentDictItemKeyTurple not in inGSettings["AgentDict"]:
inGSettings["AgentDict"][lAgentDictItemKeyTurple] = SettingsTemplate.__AgentDictItemCreate__()
lThisAgentDict = inGSettings["AgentDict"][lAgentDictItemKeyTurple]
lThisAgentDict["ActivityList"].append(inActivityItemDict)
lThisAgentDict["ActivityList"].append(lActivityItemDict)
def AgentOSCMD(inGSettings, inHostNameStr, inUserStr, inCMDStr, inRunAsyncBool=True, inSendOutputToOrchestratorLogsBool=True, inCMDEncodingStr="cp1251"):

Loading…
Cancel
Save