@ -75,7 +75,6 @@ def AgentActivityItemExists(inGSettings, inHostNameStr, inUserStr, inGUIDStr):
lAgentDictItemKeyTurple = ( inHostNameStr . upper ( ) , inUserStr . upper ( ) )
lAgentDictItemKeyTurple = ( inHostNameStr . upper ( ) , inUserStr . upper ( ) )
lResultBool = False
lResultBool = False
if lAgentDictItemKeyTurple in inGSettings [ " AgentDict " ] :
if lAgentDictItemKeyTurple in inGSettings [ " AgentDict " ] :
inGSettings [ " AgentDict " ] [ lAgentDictItemKeyTurple ] = SettingsTemplate . __AgentDictItemCreate__ ( )
for lActivityItem in inGSettings [ " AgentDict " ] [ lAgentDictItemKeyTurple ] [ " ActivityList " ] :
for lActivityItem in inGSettings [ " AgentDict " ] [ lAgentDictItemKeyTurple ] [ " ActivityList " ] :
if inGUIDStr == lActivityItem . get ( " GUIDStr " , None ) :
if inGUIDStr == lActivityItem . get ( " GUIDStr " , None ) :
lResultBool = True
lResultBool = True
@ -144,6 +143,7 @@ def AgentOSFileSend(inGSettings, inHostNameStr, inUserStr, inOrchestratorFilePat
"""
"""
Send the file from the Orchestrator to Agent ( synchroniously ) pyOpenRPA . Agent daemon process ( safe for JSON transmition ) .
Send the file from the Orchestrator to Agent ( synchroniously ) pyOpenRPA . Agent daemon process ( safe for JSON transmition ) .
Work safety with big files
Work safety with big files
Thread safe - you can call def even if you dont init the orchestrator - def will be executed later
: param inGSettings : Global settings dict ( singleton )
: param inGSettings : Global settings dict ( singleton )
: param inHostNameStr :
: param inHostNameStr :
@ -152,41 +152,58 @@ def AgentOSFileSend(inGSettings, inHostNameStr, inUserStr, inOrchestratorFilePat
: param inFileDataBytes :
: param inFileDataBytes :
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
"""
lActivityItemCheckIntervalSecFloat = inGSettings [ " ServerDict " ] [ " AgentFileChunkCheckIntervalSecFloat " ]
# Get the chunk limit
lChunkByteSizeInt = inGSettings [ " ServerDict " ] [ " AgentFileChunkBytesSizeInt " ]
lL = inGSettings . get ( " Logger " , None )
# Open the file and get the size (in bytes)
# Check thread
lFile = open ( inOrchestratorFilePathStr , " rb " )
if inGSettings [ " ServerDict " ] [ " ServerThread " ] is None :
lFileSizeBytesInt = lFile . seek ( 0 , 2 )
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " AgentOSFileSend run before server init - activity will be append in the processor queue. " )
lFile . seek ( 0 )
lResult = {
" Def " : AgentOSFileSend , # def link or def alias (look gSettings["Processor"]["AliasDefDict"])
lChunkCountInt = math . ceil ( lFileSizeBytesInt / lChunkByteSizeInt )
" ArgList " : [ ] , # Args list
if lL : lL . info ( f " O2A: Start to send binary file via chunks. Chunk count: { lChunkCountInt } , From (Orch side): { inOrchestratorFilePathStr } , To (Agent side): { inAgentFilePathStr } " )
" ArgDict " : { " inHostNameStr " : inHostNameStr , " inUserStr " : inUserStr , " inOrchestratorFilePathStr " : inOrchestratorFilePathStr , " inAgentFilePathStr " : inAgentFilePathStr } , # Args dictionary
for lChunkNumberInt in range ( lChunkCountInt ) :
" ArgGSettings " : " inGSettings " , # Name of GSettings attribute: str (ArgDict) or index (for ArgList)
# Read chunk
" ArgLogger " : None # Name of GSettings attribute: str (ArgDict) or index (for ArgList)
lFileChunkBytes = lFile . read ( lChunkByteSizeInt )
}
# Convert to base64
inGSettings [ " ProcessorDict " ] [ " ActivityList " ] . append ( lResult )
lFileChunkBase64Str = base64 . b64encode ( lFileChunkBytes ) . decode ( " utf-8 " )
else : # In processor - do execution
# Send chunk
lActivityItemCheckIntervalSecFloat = inGSettings [ " ServerDict " ] [ " AgentFileChunkCheckIntervalSecFloat " ]
if lChunkNumberInt == 0 :
lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrCreate ( inGSettings = inGSettings , inHostNameStr = inHostNameStr ,
# Get the chunk limit
inUserStr = inUserStr , inFilePathStr = inAgentFilePathStr ,
lChunkByteSizeInt = inGSettings [ " ServerDict " ] [ " AgentFileChunkBytesSizeInt " ]
inFileDataBase64Str = lFileChunkBase64Str )
else :
lL = inGSettings . get ( " Logger " , None )
lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrAppend ( inGSettings = inGSettings , inHostNameStr = inHostNameStr ,
inUserStr = inUserStr , inFilePathStr = inAgentFilePathStr ,
# Open the file and get the size (in bytes)
inFileDataBase64Str = lFileChunkBase64Str )
lFile = open ( inOrchestratorFilePathStr , " rb " )
# Wait for the activity will be deleted
lFileSizeBytesInt = lFile . seek ( 0 , 2 )
while AgentActivityItemExists ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inGUIDStr = lActivityItemGUIDStr ) :
lFile . seek ( 0 )
time . sleep ( lActivityItemCheckIntervalSecFloat )
#import pdb
if lL : lL . debug (
#pdb.set_trace()
f " O2A: BINARY SEND: Current chunk index: { lChunkNumberInt } " )
lChunkCountInt = math . ceil ( lFileSizeBytesInt / lChunkByteSizeInt )
# Close the file
if lL : lL . info ( f " O2A: Start to send binary file via chunks. Chunk count: { lChunkCountInt } , From (Orch side): { inOrchestratorFilePathStr } , To (Agent side): { inAgentFilePathStr } " )
lFile . close ( )
for lChunkNumberInt in range ( lChunkCountInt ) :
# Read chunk
lFileChunkBytes = lFile . read ( lChunkByteSizeInt )
# Convert to base64
lFileChunkBase64Str = base64 . b64encode ( lFileChunkBytes ) . decode ( " utf-8 " )
# Send chunk
if lChunkNumberInt == 0 :
lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrCreate ( inGSettings = inGSettings , inHostNameStr = inHostNameStr ,
inUserStr = inUserStr , inFilePathStr = inAgentFilePathStr ,
inFileDataBase64Str = lFileChunkBase64Str )
else :
lActivityItemGUIDStr = AgentOSFileBinaryDataBase64StrAppend ( inGSettings = inGSettings , inHostNameStr = inHostNameStr ,
inUserStr = inUserStr , inFilePathStr = inAgentFilePathStr ,
inFileDataBase64Str = lFileChunkBase64Str )
# Wait for the activity will be deleted
while AgentActivityItemExists ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inGUIDStr = lActivityItemGUIDStr ) :
time . sleep ( lActivityItemCheckIntervalSecFloat )
if lL : lL . debug (
f " O2A: BINARY SEND: Current chunk index: { lChunkNumberInt } " )
if lL : lL . info (
f " O2A: BINARY SEND: Transmition has been complete " )
# Close the file
lFile . close ( )
def AgentOSFileBinaryDataBytesCreate ( inGSettings , inHostNameStr , inUserStr , inFilePathStr , inFileDataBytes ) :
def AgentOSFileBinaryDataBytesCreate ( inGSettings , inHostNameStr , inUserStr , inFilePathStr , inFileDataBytes ) :
"""
"""
@ -2131,6 +2148,7 @@ def Orchestrator(inGSettings, inDumpRestoreBool = True, inRunAsAdministratorBool
lItemDict = lListenDict [ lItemKeyStr ]
lItemDict = lListenDict [ lItemKeyStr ]
lThreadServer = Server . RobotDaemonServer ( lItemKeyStr , gSettingsDict )
lThreadServer = Server . RobotDaemonServer ( lItemKeyStr , gSettingsDict )
lThreadServer . start ( )
lThreadServer . start ( )
gSettingsDict [ " ServerDict " ] [ " ServerThread " ] = lThreadServer
lItemDict [ " ServerInstance " ] = lThreadServer
lItemDict [ " ServerInstance " ] = lThreadServer
# Init the RobotScreenActive in another thread
# Init the RobotScreenActive in another thread