From 13cc33cd12101be1898ada50e2bf7f97544e2730 Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Fri, 25 Jun 2021 17:18:11 +0300 Subject: [PATCH] Agent - add timeout settings! Actual when network has some soft, which can force close the connection without response... --- Agent/AgentSettings.py | 2 ++ Sources/pyOpenRPA/Agent/A2O.py | 2 +- Sources/pyOpenRPA/Agent/O2A.py | 22 ++++++++++++++-------- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/Agent/AgentSettings.py b/Agent/AgentSettings.py index ce7fe465..881a046f 100644 --- a/Agent/AgentSettings.py +++ b/Agent/AgentSettings.py @@ -18,10 +18,12 @@ if __name__ == "__main__": # New init way }, "O2ADict":{ "IsOnlineBool": True, # Parameter can be changed when program executes + "ConnectionTimeoutSecFloat": 600.0, # Time to wait response from requests. If exceed - raise timeout exception "RetryTimeoutSecFloat": 10.0, # Retry interval if some error when connect }, "A2ODict": { + "ConnectionTimeoutSecFloat": 10.0, # Time to wait response from requests. If exceed - raise timeout exception "RetryTimeoutSecFloat": 10.0, # Retry interval if some error when connect }, "Logger":logging.getLogger("Agent"), diff --git a/Sources/pyOpenRPA/Agent/A2O.py b/Sources/pyOpenRPA/Agent/A2O.py index f16cdf1d..3f302476 100644 --- a/Sources/pyOpenRPA/Agent/A2O.py +++ b/Sources/pyOpenRPA/Agent/A2O.py @@ -16,7 +16,7 @@ def _A2ODataSend(inGSettings, inDataDict): lHostStr = inGSettings["OrchestratorDict"]["HostStr"] lPortInt = inGSettings["OrchestratorDict"]["PortInt"] lURLStr=f"{lProtocolStr}://{lHostStr}:{lPortInt}/pyOpenRPA/Agent/A2O" - lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=inDataDict) + lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=inDataDict, timeout=inGSettings["A2ODict"]["ConnectionTimeoutSecFloat"]) except Exception as e: if lL: lL.exception(f"A2O Error handler.") diff --git a/Sources/pyOpenRPA/Agent/O2A.py b/Sources/pyOpenRPA/Agent/O2A.py index 38b92316..0f040635 100644 --- a/Sources/pyOpenRPA/Agent/O2A.py +++ b/Sources/pyOpenRPA/Agent/O2A.py @@ -26,25 +26,31 @@ def O2A_Loop(inGSettings): lPortInt = inGSettings["OrchestratorDict"]["PortInt"] lURLStr=f"{lProtocolStr}://{lHostStr}:{lPortInt}/pyOpenRPA/Agent/O2A" lDataDict = { "HostNameUpperStr": inGSettings["AgentDict"]["HostNameUpperStr"], "UserUpperStr": inGSettings["AgentDict"]["UserUpperStr"], "ActivityLastGUIDStr": lActivityLastGUIDStr} - lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=lDataDict) + lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=lDataDict, timeout=inGSettings["O2ADict"]["ConnectionTimeoutSecFloat"]) if lResponse.status_code != 200: if lL: lL.warning(f"Agent can not connect to Orchestrator. Below the response from the orchestrator:{lResponse}") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) else: lRequestBody = lResponse.text - lQueueItem = lResponse.json() # Try to get JSON - # Append QUEUE item in ProcessorDict > ActivityList - lActivityLastGUIDStr = lQueueItem["GUIDStr"] - inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) - if lL: lL.debug(f"ActivityItem was received from orchestrator: {lQueueItem}"); + if len(lRequestBody) != 0: # CHeck if not empty result when close the connection from orch + lQueueItem = lResponse.json() # Try to get JSON + # Append QUEUE item in ProcessorDict > ActivityList + lActivityLastGUIDStr = lQueueItem["GUIDStr"] + inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) + if lL: lL.debug(f"ActivityItem was received from orchestrator: {lQueueItem}"); + else: + if lL: lL.debug(f"Empty response from the orchestrator - loop when refresh the connection between Orc and Agent"); except requests.exceptions.ConnectionError as e: if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) except ConnectionResetError as e: - if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") + if lL: lL.error(f"O2A Connection reset error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) except json.decoder.JSONDecodeError as e: - if lL: lL.error(f"See body of the recieved content from the Orchestrator: {lRequestBody}") + if lL: lL.error(f"O2A JSON decode error - See body of the recieved content from the Orchestrator: {lRequestBody}") + time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) + except requests.exceptions.Timeout as e: + if lL: lL.exception(f"O2A requests timeout error (no response for long time). Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) except Exception as e: if lL: lL.exception(f"O2A Error handler. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.")