diff --git a/Sources/pyOpenRPA/Agent/__Agent__.py b/Sources/pyOpenRPA/Agent/__Agent__.py index 2ac32d21..9ac93c19 100644 --- a/Sources/pyOpenRPA/Agent/__Agent__.py +++ b/Sources/pyOpenRPA/Agent/__Agent__.py @@ -17,6 +17,20 @@ def OSFileBinaryDataBase64StrCreate(inFilePathStr, inFileDataBase64Str,inGSettin if lL: lL.info(lMessageStr) A2O.LogListSend(inGSettings=inGSettings, inLogList=[lMessageStr]) +# Append binary file by the base64 string (safe for JSON transmition) +def OSFileBinaryDataBase64StrAppend(inFilePathStr, inFileDataBase64Str,inGSettings = None): + """ + Create binary file by the base64 string (safe for JSON transmition) + + """ + lFile = open(inFilePathStr, "ab") + lFile.write(base64.b64decode(inFileDataBase64Str)) + lFile.close() + lL = inGSettings.get("Logger", None) if type(inGSettings) is dict else None + lMessageStr = f"AGENT Binary file {inFilePathStr} has been appended." + if lL: lL.info(lMessageStr) + A2O.LogListSend(inGSettings=inGSettings, inLogList=[lMessageStr]) + # Create text file by the string def OSFileTextDataStrCreate(inFilePathStr, inFileDataStr, inEncodingStr = "utf-8",inGSettings = None): """ diff --git a/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py b/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py index 2bfac13b..e6ac51d4 100644 --- a/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py +++ b/Sources/pyOpenRPA/Orchestrator/BackwardCompatibility.py @@ -468,4 +468,16 @@ def Update(inGSettings): } } if lL: lL.warning( - f"Backward compatibility (v1.2.2 to v1.2.3): Add new key ServerDict > ListenDict. Transfer port from ServerDict > ListenPort") # Log about compatibility \ No newline at end of file + f"Backward compatibility (v1.2.2 to v1.2.3): Add new key ServerDict > ListenDict. Transfer port from ServerDict > ListenPort") # Log about compatibility + # Add new key + #"ServerDict": { + # "AgentFileChunkBytesSizeInt": 50000000, # size of the each chunk for the agent transmition + # "AgentFileChunkCheckIntervalSecFloat": 0.2, # The interval for check last activity item was successfully sent + if "AgentFileChunkBytesSizeInt" not in inGSettings["ServerDict"]: + inGSettings["ServerDict"]["AgentFileChunkBytesSizeInt"]= 50000000 + if lL: lL.warning( + f"Backward compatibility (v1.2.3 to v1.2.4): Add new key ServerDict > AgentFileChunkBytesSizeInt") # Log about compatibility + if "AgentFileChunkCheckIntervalSecFloat" not in inGSettings["ServerDict"]: + inGSettings["ServerDict"]["AgentFileChunkCheckIntervalSecFloat"]= 0.2 + if lL: lL.warning( + f"Backward compatibility (v1.2.3 to v1.2.4): Add new key ServerDict > AgentFileChunkCheckIntervalSecFloat") # Log about compatibility \ No newline at end of file diff --git a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py index b3197ee8..aba20211 100644 --- a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py +++ b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py @@ -417,6 +417,7 @@ def pyOpenRPA_Agent_A2O(inRequest, inGSettings): lInputByteArray = inRequest.rfile.read(lInputByteArrayLength) # Превращение массива байт в объект lInput = json.loads(lInputByteArray.decode('utf8')) + lAgentDictItemKeyTurple = (lInput["HostNameUpperStr"], lInput["UserUpperStr"]) if "LogList" in lInput: for lLogItemStr in lInput["LogList"]: inGSettings["Logger"].info(lLogItemStr) @@ -426,7 +427,17 @@ def pyOpenRPA_Agent_A2O(inRequest, inGSettings): # Create item in gSettings inGSettings["AgentActivityReturnDict"][lActivityReturnItemKeyStr]=SettingsTemplate.__AgentActivityReturnDictItemCreate__(inReturn=lActivityReturnItemValue) if lL: lL.debug(f"SERVER: pyOpenRPA_Agent_A2O:: Has recieved result of the activity items from agent! ActivityItem GUID Str: {lActivityReturnItemKeyStr}; Return value: {lActivityReturnItemValue}") - + # Delete the source activity item from AgentDict + if lAgentDictItemKeyTurple in inGSettings["AgentDict"]: + lAgentDictActivityListNew = [] + lAgentDict = inGSettings["AgentDict"][lAgentDictItemKeyTurple] + for lActivityItem in lAgentDict["ActivityList"]: + if lActivityReturnItemKeyStr != lActivityItem.get("GUIDStr",None): + lAgentDictActivityListNew.append(lActivityItem) + else: + del lActivityItem + if lL: lL.debug(f"SERVER: pyOpenRPA_Agent_A2O:: Source activity item request was deleted from the orchestrator. ActivityItem GUID Str: {lActivityReturnItemKeyStr}") + inGSettings["AgentDict"][lAgentDictItemKeyTurple]["ActivityList"] = lAgentDictActivityListNew def SettingsUpdate(inGlobalConfiguration): import os import pyOpenRPA.Orchestrator diff --git a/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py b/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py index 923a0ce1..b1656b60 100644 --- a/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py +++ b/Sources/pyOpenRPA/Orchestrator/SettingsTemplate.py @@ -42,6 +42,8 @@ def __Create__(): "AgentActivityLifetimeSecFloat": 1200.0, # Time in seconds to life for activity for the agent "AgentConnectionLifetimeSecFloat": 300.0, # Time in seconds to handle the open connection to the Agent "AgentLoopSleepSecFloat": 2.0, # Time in seconds to sleep between loops when check to send some activity to the agent + "AgentFileChunkBytesSizeInt": 50000000, # size of the each chunk for the agent transmition + "AgentFileChunkCheckIntervalSecFloat": 0.2, # The interval for check last activity item was successfully sent "WorkingDirectoryPathStr": None, # Will be filled automatically "RequestTimeoutSecFloat": 300, # Time to handle request in seconds, "ListenDict": { # Prototype diff --git a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py index 366db88b..c9ee1d80 100644 --- a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py +++ b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py @@ -26,6 +26,7 @@ from .RobotScreenActive import Monitor #Start robot screen active from . import SettingsTemplate # Settings template import uuid # Generate uuid import datetime # datetime +import math #Единый глобальный словарь (За основу взять из Settings.py) gSettingsDict = None @@ -60,6 +61,27 @@ def AgentActivityItemAdd(inGSettings, inHostNameStr, inUserStr, inActivityItemDi # Return the result return lGUIDStr + +def AgentActivityItemExists(inGSettings, inHostNameStr, inUserStr, inGUIDStr): + """ + Check by GUID if ActivityItem has exists in request list. If exist - the result response has not been recieved from the agent + + :param inGSettings: Global settings dict (singleton) + :param inGUIDStr: GUID String of the ActivityItem + :return: True - ActivityItem is exist in AgentDict ; False - else case + """ + # Check if GUID is exists in dict - has been recieved + # Main alg + lAgentDictItemKeyTurple = (inHostNameStr.upper(),inUserStr.upper()) + lResultBool = False + if lAgentDictItemKeyTurple in inGSettings["AgentDict"]: + inGSettings["AgentDict"][lAgentDictItemKeyTurple] = SettingsTemplate.__AgentDictItemCreate__() + for lActivityItem in inGSettings["AgentDict"][lAgentDictItemKeyTurple]["ActivityList"]: + if inGUIDStr == lActivityItem.get("GUIDStr",None): + lResultBool = True + break + return lResultBool + def AgentActivityItemReturnExists(inGSettings, inGUIDStr): """ Check by GUID if ActivityItem has been executed and result has come to the Orchestrator @@ -117,6 +139,55 @@ def AgentOSCMD(inGSettings, inHostNameStr, inUserStr, inCMDStr, inRunAsyncBool=T #Send item in AgentDict for the futher data transmition return AgentActivityItemAdd(inGSettings=inGSettings, inHostNameStr=inHostNameStr, inUserStr=inUserStr, inActivityItemDict=lActivityItemDict) + +def AgentOSFileSend(inGSettings, inHostNameStr, inUserStr, inOrchestratorFilePathStr, inAgentFilePathStr): + """ + Send the file from the Orchestrator to Agent (synchroniously) pyOpenRPA.Agent daemon process (safe for JSON transmition). + Work safety with big files + + :param inGSettings: Global settings dict (singleton) + :param inHostNameStr: + :param inUserStr: + :param inFilePathStr: + :param inFileDataBytes: + :return: GUID String of the ActivityItem - you can wait (sync or async) result by this guid! + """ + lActivityItemCheckIntervalSecFloat = inGSettings["ServerDict"]["AgentFileChunkCheckIntervalSecFloat"] + + # Get the chunk limit + lChunkByteSizeInt = inGSettings["ServerDict"]["AgentFileChunkBytesSizeInt"] + + lL = inGSettings.get("Logger",None) + + # Open the file and get the size (in bytes) + lFile = open(inOrchestratorFilePathStr,"rb") + lFileSizeBytesInt = lFile.seek(0,2) + lFile.seek(0) + + lChunkCountInt = math.ceil(lFileSizeBytesInt/lChunkByteSizeInt) + if lL: lL.info(f"O2A: Start to send binary file via chunks. Chunk count: {lChunkCountInt}, From (Orch side): {inOrchestratorFilePathStr}, To (Agent side): {inAgentFilePathStr}") + for lChunkNumberInt in range(lChunkCountInt): + # Read chunk + lFileChunkBytes = lFile.read(lChunkByteSizeInt) + # Convert to base64 + lFileChunkBase64Str = base64.b64encode(lFileChunkBytes).decode("utf-8") + # Send chunk + if lChunkNumberInt == 0: + lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrCreate(inGSettings=inGSettings,inHostNameStr=inHostNameStr, + inUserStr=inUserStr,inFilePathStr=inAgentFilePathStr, + inFileDataBase64Str=lFileChunkBase64Str) + else: + lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrAppend(inGSettings=inGSettings, inHostNameStr=inHostNameStr, + inUserStr=inUserStr, inFilePathStr=inAgentFilePathStr, + inFileDataBase64Str=lFileChunkBase64Str) + # Wait for the activity will be deleted + while AgentActivityItemExists(inGSettings=inGSettings,inHostNameStr=inHostNameStr,inUserStr=inUserStr,inGUIDStr=lActivityItemGUIDStr): + time.sleep(lActivityItemCheckIntervalSecFloat) + if lL: lL.debug( + f"O2A: BINARY SEND: Current chunk index: {lChunkNumberInt}") + # Close the file + lFile.close() + def AgentOSFileBinaryDataBytesCreate(inGSettings, inHostNameStr, inUserStr, inFilePathStr, inFileDataBytes): """ Create binary file by the base64 string by the pyOpenRPA.Agent daemon process (safe for JSON transmition) @@ -163,6 +234,30 @@ def AgentOSFileBinaryDataBase64StrCreate(inGSettings, inHostNameStr, inUserStr, #Send item in AgentDict for the futher data transmition return AgentActivityItemAdd(inGSettings=inGSettings, inHostNameStr=inHostNameStr, inUserStr=inUserStr, inActivityItemDict=lActivityItemDict) + +def AgentOSFileBinaryDataBase64StrAppend(inGSettings, inHostNameStr, inUserStr, inFilePathStr, inFileDataBase64Str): + """ + Append binary file by the base64 string by the pyOpenRPA.Agent daemon process (safe for JSON transmission) + + :param inGSettings: Global settings dict (singleton) + :param inHostNameStr: + :param inUserStr: + :param inFilePathStr: + :param inFileDataBase64Str: + :return: GUID String of the ActivityItem - you can wait (sync or async) result by this guid! + """ + + lActivityItemDict = { + "Def":"OSFileBinaryDataBase64StrAppend", # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"]) + "ArgList":[], # Args list + "ArgDict":{"inFilePathStr":inFilePathStr,"inFileDataBase64Str":inFileDataBase64Str}, # Args dictionary + "ArgGSettings": "inGSettings", # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + "ArgLogger": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) + } + #Send item in AgentDict for the futher data transmition + return AgentActivityItemAdd(inGSettings=inGSettings, inHostNameStr=inHostNameStr, inUserStr=inUserStr, inActivityItemDict=lActivityItemDict) + + # Send text file to Agent (string) def AgentOSFileTextDataStrCreate(inGSettings, inHostNameStr, inUserStr, inFilePathStr, inFileDataStr, inEncodingStr = "utf-8"): """