draft 1.2.4 (need test)

# Autoremove ActivityItem from Orc if agetn was complete this activity
# new def Orchestrator.AgentOSFileSend
# new def Orchestrator.AgentOSFileBinaryDataBase64StrAppend
# new def Agent.AgentOSFileBinaryDataBase64StrAppend
dev-linux
Ivan Maslov 3 years ago
parent 13cc33cd12
commit e361775027

@ -17,6 +17,20 @@ def OSFileBinaryDataBase64StrCreate(inFilePathStr, inFileDataBase64Str,inGSettin
if lL: lL.info(lMessageStr)
A2O.LogListSend(inGSettings=inGSettings, inLogList=[lMessageStr])
# Append binary file by the base64 string (safe for JSON transmition)
def OSFileBinaryDataBase64StrAppend(inFilePathStr, inFileDataBase64Str,inGSettings = None):
"""
Create binary file by the base64 string (safe for JSON transmition)
"""
lFile = open(inFilePathStr, "ab")
lFile.write(base64.b64decode(inFileDataBase64Str))
lFile.close()
lL = inGSettings.get("Logger", None) if type(inGSettings) is dict else None
lMessageStr = f"AGENT Binary file {inFilePathStr} has been appended."
if lL: lL.info(lMessageStr)
A2O.LogListSend(inGSettings=inGSettings, inLogList=[lMessageStr])
# Create text file by the string
def OSFileTextDataStrCreate(inFilePathStr, inFileDataStr, inEncodingStr = "utf-8",inGSettings = None):
"""

@ -469,3 +469,15 @@ def Update(inGSettings):
}
if lL: lL.warning(
f"Backward compatibility (v1.2.2 to v1.2.3): Add new key ServerDict > ListenDict. Transfer port from ServerDict > ListenPort") # Log about compatibility
# Add new key
#"ServerDict": {
# "AgentFileChunkBytesSizeInt": 50000000, # size of the each chunk for the agent transmition
# "AgentFileChunkCheckIntervalSecFloat": 0.2, # The interval for check last activity item was successfully sent
if "AgentFileChunkBytesSizeInt" not in inGSettings["ServerDict"]:
inGSettings["ServerDict"]["AgentFileChunkBytesSizeInt"]= 50000000
if lL: lL.warning(
f"Backward compatibility (v1.2.3 to v1.2.4): Add new key ServerDict > AgentFileChunkBytesSizeInt") # Log about compatibility
if "AgentFileChunkCheckIntervalSecFloat" not in inGSettings["ServerDict"]:
inGSettings["ServerDict"]["AgentFileChunkCheckIntervalSecFloat"]= 0.2
if lL: lL.warning(
f"Backward compatibility (v1.2.3 to v1.2.4): Add new key ServerDict > AgentFileChunkCheckIntervalSecFloat") # Log about compatibility

@ -417,6 +417,7 @@ def pyOpenRPA_Agent_A2O(inRequest, inGSettings):
lInputByteArray = inRequest.rfile.read(lInputByteArrayLength)
# Превращение массива байт в объект
lInput = json.loads(lInputByteArray.decode('utf8'))
lAgentDictItemKeyTurple = (lInput["HostNameUpperStr"], lInput["UserUpperStr"])
if "LogList" in lInput:
for lLogItemStr in lInput["LogList"]:
inGSettings["Logger"].info(lLogItemStr)
@ -426,7 +427,17 @@ def pyOpenRPA_Agent_A2O(inRequest, inGSettings):
# Create item in gSettings
inGSettings["AgentActivityReturnDict"][lActivityReturnItemKeyStr]=SettingsTemplate.__AgentActivityReturnDictItemCreate__(inReturn=lActivityReturnItemValue)
if lL: lL.debug(f"SERVER: pyOpenRPA_Agent_A2O:: Has recieved result of the activity items from agent! ActivityItem GUID Str: {lActivityReturnItemKeyStr}; Return value: {lActivityReturnItemValue}")
# Delete the source activity item from AgentDict
if lAgentDictItemKeyTurple in inGSettings["AgentDict"]:
lAgentDictActivityListNew = []
lAgentDict = inGSettings["AgentDict"][lAgentDictItemKeyTurple]
for lActivityItem in lAgentDict["ActivityList"]:
if lActivityReturnItemKeyStr != lActivityItem.get("GUIDStr",None):
lAgentDictActivityListNew.append(lActivityItem)
else:
del lActivityItem
if lL: lL.debug(f"SERVER: pyOpenRPA_Agent_A2O:: Source activity item request was deleted from the orchestrator. ActivityItem GUID Str: {lActivityReturnItemKeyStr}")
inGSettings["AgentDict"][lAgentDictItemKeyTurple]["ActivityList"] = lAgentDictActivityListNew
def SettingsUpdate(inGlobalConfiguration):
import os
import pyOpenRPA.Orchestrator

@ -42,6 +42,8 @@ def __Create__():
"AgentActivityLifetimeSecFloat": 1200.0, # Time in seconds to life for activity for the agent
"AgentConnectionLifetimeSecFloat": 300.0, # Time in seconds to handle the open connection to the Agent
"AgentLoopSleepSecFloat": 2.0, # Time in seconds to sleep between loops when check to send some activity to the agent
"AgentFileChunkBytesSizeInt": 50000000, # size of the each chunk for the agent transmition
"AgentFileChunkCheckIntervalSecFloat": 0.2, # The interval for check last activity item was successfully sent
"WorkingDirectoryPathStr": None, # Will be filled automatically
"RequestTimeoutSecFloat": 300, # Time to handle request in seconds,
"ListenDict": { # Prototype

@ -26,6 +26,7 @@ from .RobotScreenActive import Monitor #Start robot screen active
from . import SettingsTemplate # Settings template
import uuid # Generate uuid
import datetime # datetime
import math
#Единый глобальный словарь (За основу взять из Settings.py)
gSettingsDict = None
@ -60,6 +61,27 @@ def AgentActivityItemAdd(inGSettings, inHostNameStr, inUserStr, inActivityItemDi
# Return the result
return lGUIDStr
def AgentActivityItemExists(inGSettings, inHostNameStr, inUserStr, inGUIDStr):
"""
Check by GUID if ActivityItem has exists in request list. If exist - the result response has not been recieved from the agent
:param inGSettings: Global settings dict (singleton)
:param inGUIDStr: GUID String of the ActivityItem
:return: True - ActivityItem is exist in AgentDict ; False - else case
"""
# Check if GUID is exists in dict - has been recieved
# Main alg
lAgentDictItemKeyTurple = (inHostNameStr.upper(),inUserStr.upper())
lResultBool = False
if lAgentDictItemKeyTurple in inGSettings["AgentDict"]:
inGSettings["AgentDict"][lAgentDictItemKeyTurple] = SettingsTemplate.__AgentDictItemCreate__()
for lActivityItem in inGSettings["AgentDict"][lAgentDictItemKeyTurple]["ActivityList"]:
if inGUIDStr == lActivityItem.get("GUIDStr",None):
lResultBool = True
break
return lResultBool
def AgentActivityItemReturnExists(inGSettings, inGUIDStr):
"""
Check by GUID if ActivityItem has been executed and result has come to the Orchestrator
@ -117,6 +139,55 @@ def AgentOSCMD(inGSettings, inHostNameStr, inUserStr, inCMDStr, inRunAsyncBool=T
#Send item in AgentDict for the futher data transmition
return AgentActivityItemAdd(inGSettings=inGSettings, inHostNameStr=inHostNameStr, inUserStr=inUserStr, inActivityItemDict=lActivityItemDict)
def AgentOSFileSend(inGSettings, inHostNameStr, inUserStr, inOrchestratorFilePathStr, inAgentFilePathStr):
"""
Send the file from the Orchestrator to Agent (synchroniously) pyOpenRPA.Agent daemon process (safe for JSON transmition).
Work safety with big files
:param inGSettings: Global settings dict (singleton)
:param inHostNameStr:
:param inUserStr:
:param inFilePathStr:
:param inFileDataBytes:
:return: GUID String of the ActivityItem - you can wait (sync or async) result by this guid!
"""
lActivityItemCheckIntervalSecFloat = inGSettings["ServerDict"]["AgentFileChunkCheckIntervalSecFloat"]
# Get the chunk limit
lChunkByteSizeInt = inGSettings["ServerDict"]["AgentFileChunkBytesSizeInt"]
lL = inGSettings.get("Logger",None)
# Open the file and get the size (in bytes)
lFile = open(inOrchestratorFilePathStr,"rb")
lFileSizeBytesInt = lFile.seek(0,2)
lFile.seek(0)
lChunkCountInt = math.ceil(lFileSizeBytesInt/lChunkByteSizeInt)
if lL: lL.info(f"O2A: Start to send binary file via chunks. Chunk count: {lChunkCountInt}, From (Orch side): {inOrchestratorFilePathStr}, To (Agent side): {inAgentFilePathStr}")
for lChunkNumberInt in range(lChunkCountInt):
# Read chunk
lFileChunkBytes = lFile.read(lChunkByteSizeInt)
# Convert to base64
lFileChunkBase64Str = base64.b64encode(lFileChunkBytes).decode("utf-8")
# Send chunk
if lChunkNumberInt == 0:
lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrCreate(inGSettings=inGSettings,inHostNameStr=inHostNameStr,
inUserStr=inUserStr,inFilePathStr=inAgentFilePathStr,
inFileDataBase64Str=lFileChunkBase64Str)
else:
lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrAppend(inGSettings=inGSettings, inHostNameStr=inHostNameStr,
inUserStr=inUserStr, inFilePathStr=inAgentFilePathStr,
inFileDataBase64Str=lFileChunkBase64Str)
# Wait for the activity will be deleted
while AgentActivityItemExists(inGSettings=inGSettings,inHostNameStr=inHostNameStr,inUserStr=inUserStr,inGUIDStr=lActivityItemGUIDStr):
time.sleep(lActivityItemCheckIntervalSecFloat)
if lL: lL.debug(
f"O2A: BINARY SEND: Current chunk index: {lChunkNumberInt}")
# Close the file
lFile.close()
def AgentOSFileBinaryDataBytesCreate(inGSettings, inHostNameStr, inUserStr, inFilePathStr, inFileDataBytes):
"""
Create binary file by the base64 string by the pyOpenRPA.Agent daemon process (safe for JSON transmition)
@ -163,6 +234,30 @@ def AgentOSFileBinaryDataBase64StrCreate(inGSettings, inHostNameStr, inUserStr,
#Send item in AgentDict for the futher data transmition
return AgentActivityItemAdd(inGSettings=inGSettings, inHostNameStr=inHostNameStr, inUserStr=inUserStr, inActivityItemDict=lActivityItemDict)
def AgentOSFileBinaryDataBase64StrAppend(inGSettings, inHostNameStr, inUserStr, inFilePathStr, inFileDataBase64Str):
"""
Append binary file by the base64 string by the pyOpenRPA.Agent daemon process (safe for JSON transmission)
:param inGSettings: Global settings dict (singleton)
:param inHostNameStr:
:param inUserStr:
:param inFilePathStr:
:param inFileDataBase64Str:
:return: GUID String of the ActivityItem - you can wait (sync or async) result by this guid!
"""
lActivityItemDict = {
"Def":"OSFileBinaryDataBase64StrAppend", # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
"ArgList":[], # Args list
"ArgDict":{"inFilePathStr":inFilePathStr,"inFileDataBase64Str":inFileDataBase64Str}, # Args dictionary
"ArgGSettings": "inGSettings", # Name of GSettings attribute: str (ArgDict) or index (for ArgList)
"ArgLogger": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList)
}
#Send item in AgentDict for the futher data transmition
return AgentActivityItemAdd(inGSettings=inGSettings, inHostNameStr=inHostNameStr, inUserStr=inUserStr, inActivityItemDict=lActivityItemDict)
# Send text file to Agent (string)
def AgentOSFileTextDataStrCreate(inGSettings, inHostNameStr, inUserStr, inFilePathStr, inFileDataStr, inEncodingStr = "utf-8"):
"""

Loading…
Cancel
Save