@ -33,7 +33,7 @@ gSettingsDict = None
# AGENT DEFS
def AgentActivityItemAdd ( in GSettings, in HostNameStr, inUserStr , inActivityItemDict ) :
def AgentActivityItemAdd ( in HostNameStr, inUserStr , inActivityItemDict , inGSettings = None ) :
"""
Add activity in AgentDict . Check if item is created
@ -43,6 +43,8 @@ def AgentActivityItemAdd(inGSettings, inHostNameStr, inUserStr, inActivityItemDi
: param inActivityItemDict : ActivityItem
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = copy . deepcopy ( inActivityItemDict )
# Add GUIDStr if not exist
lGUIDStr = None
@ -62,7 +64,7 @@ def AgentActivityItemAdd(inGSettings, inHostNameStr, inUserStr, inActivityItemDi
return lGUIDStr
def AgentActivityItemExists ( in GSettings, in HostNameStr, inUserStr , inGUIDStr ) :
def AgentActivityItemExists ( in HostNameStr, inUserStr , inGUIDStr , inGSettings = None ) :
"""
Check by GUID if ActivityItem has exists in request list . If exist - the result response has not been recieved from the agent
@ -71,6 +73,7 @@ def AgentActivityItemExists(inGSettings, inHostNameStr, inUserStr, inGUIDStr):
: return : True - ActivityItem is exist in AgentDict ; False - else case
"""
# Check if GUID is exists in dict - has been recieved
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Main alg
lAgentDictItemKeyTurple = ( inHostNameStr . upper ( ) , inUserStr . upper ( ) )
lResultBool = False
@ -81,7 +84,7 @@ def AgentActivityItemExists(inGSettings, inHostNameStr, inUserStr, inGUIDStr):
break
return lResultBool
def AgentActivityItemReturnExists ( inG Settings, inGUIDStr ) :
def AgentActivityItemReturnExists ( inG UIDStr, inGSettings = None ) :
"""
Check by GUID if ActivityItem has been executed and result has come to the Orchestrator
@ -89,11 +92,13 @@ def AgentActivityItemReturnExists(inGSettings, inGUIDStr):
: param inGUIDStr : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
: return : True - result has been received from the Agent to orc ; False - else case
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check if GUID is exists in dict - has been recieved
return inGUIDStr in inGSettings [ " AgentActivityReturnDict " ]
def AgentActivityItemReturnGet ( inG Settings, inG UIDStr, inCheckIntervalSecFloat = 0.5 ) :
def AgentActivityItemReturnGet ( inG UIDStr, inCheckIntervalSecFloat = 0.5 , inGSettings = None ) :
"""
Work synchroniously ! Wait while result will be recieved . Get the result of the ActivityItem execution on the Agent side . Before this please check by the def AgentActivityItemReturnExists that result has come to the Orchestrator
@ -104,6 +109,7 @@ def AgentActivityItemReturnGet(inGSettings, inGUIDStr, inCheckIntervalSecFloat =
: param inCheckIntervalSecFloat : Interval in sec of the check Activity Item result
: return : Result of the ActivityItem executed on the Agent side anr transmitted to the Orchestrator . IMPORTANT ! ONLY JSON ENABLED Types CAN BE TRANSMITTED TO ORCHESTRATOR !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
#Check if Orchestrator has been initialized - else raise exception
if Core . IsOrchestratorInitialized ( inGSettings = inGSettings ) == True :
# Wait while result will not come here
@ -114,7 +120,7 @@ def AgentActivityItemReturnGet(inGSettings, inGUIDStr, inCheckIntervalSecFloat =
else :
raise Exception ( f " __Orchestrator__.AgentActivityItemReturnGet !ATTENTION! Use this function only after Orchestrator initialization! Before orchestrator init exception will be raised. " )
def AgentOSCMD ( in GSettings, in HostNameStr, inUserStr , inCMDStr , inRunAsyncBool = True , inSendOutputToOrchestratorLogsBool = True , inCMDEncodingStr = " cp1251 " ) :
def AgentOSCMD ( in HostNameStr, inUserStr , inCMDStr , inRunAsyncBool = True , inSendOutputToOrchestratorLogsBool = True , inCMDEncodingStr = " cp1251 " , inGSettings = None ) :
"""
Send CMD to OS thought the pyOpenRPA . Agent daemon . Result return to log + Orchestrator by the A2O connection
@ -127,7 +133,7 @@ def AgentOSCMD(inGSettings, inHostNameStr, inUserStr, inCMDStr, inRunAsyncBool=T
: param inCMDEncodingStr : Set the encoding of the DOS window on the Agent server session . Windows is beautiful : ) . Default is " cp1251 " early was " cp866 " - need test
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = {
" Def " : " OSCMD " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
" ArgList " : [ ] , # Args list
@ -139,7 +145,7 @@ def AgentOSCMD(inGSettings, inHostNameStr, inUserStr, inCMDStr, inRunAsyncBool=T
return AgentActivityItemAdd ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inActivityItemDict = lActivityItemDict )
def AgentOSFileSend ( in GSettings, in HostNameStr, inUserStr , inOrchestratorFilePathStr , inAgentFilePathStr ) :
def AgentOSFileSend ( in HostNameStr, inUserStr , inOrchestratorFilePathStr , inAgentFilePathStr , inGSettings = None ) :
"""
Send the file from the Orchestrator to Agent ( synchroniously ) pyOpenRPA . Agent daemon process ( safe for JSON transmition ) .
Work safety with big files
@ -153,7 +159,7 @@ def AgentOSFileSend(inGSettings, inHostNameStr, inUserStr, inOrchestratorFilePat
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
if inGSettings [ " ServerDict " ] [ " ServerThread " ] is None :
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " AgentOSFileSend run before server init - activity will be append in the processor queue. " )
@ -205,7 +211,7 @@ def AgentOSFileSend(inGSettings, inHostNameStr, inUserStr, inOrchestratorFilePat
# Close the file
lFile . close ( )
def AgentOSFileBinaryDataBytesCreate ( in GSettings, in HostNameStr, inUserStr , inFilePathStr , inFileDataBytes ) :
def AgentOSFileBinaryDataBytesCreate ( in HostNameStr, inUserStr , inFilePathStr , inFileDataBytes , inGSettings = None ) :
"""
Create binary file by the base64 string by the pyOpenRPA . Agent daemon process ( safe for JSON transmition )
@ -216,7 +222,7 @@ def AgentOSFileBinaryDataBytesCreate(inGSettings, inHostNameStr, inUserStr, inFi
: param inFileDataBytes :
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lFileDataBase64Str = base64 . b64encode ( inFileDataBytes ) . decode ( " utf-8 " )
lActivityItemDict = {
" Def " : " OSFileBinaryDataBase64StrCreate " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
@ -229,7 +235,7 @@ def AgentOSFileBinaryDataBytesCreate(inGSettings, inHostNameStr, inUserStr, inFi
return AgentActivityItemAdd ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inActivityItemDict = lActivityItemDict )
def AgentOSFileBinaryDataBase64StrCreate ( in GSettings, in HostNameStr, inUserStr , inFilePathStr , inFileDataBase64Str ) :
def AgentOSFileBinaryDataBase64StrCreate ( in HostNameStr, inUserStr , inFilePathStr , inFileDataBase64Str , inGSettings = None ) :
"""
Create binary file by the base64 string by the pyOpenRPA . Agent daemon process ( safe for JSON transmission )
@ -240,7 +246,7 @@ def AgentOSFileBinaryDataBase64StrCreate(inGSettings, inHostNameStr, inUserStr,
: param inFileDataBase64Str :
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = {
" Def " : " OSFileBinaryDataBase64StrCreate " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
" ArgList " : [ ] , # Args list
@ -252,7 +258,7 @@ def AgentOSFileBinaryDataBase64StrCreate(inGSettings, inHostNameStr, inUserStr,
return AgentActivityItemAdd ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inActivityItemDict = lActivityItemDict )
def AgentOSFileBinaryDataBase64StrAppend ( in GSettings, in HostNameStr, inUserStr , inFilePathStr , inFileDataBase64Str ) :
def AgentOSFileBinaryDataBase64StrAppend ( in HostNameStr, inUserStr , inFilePathStr , inFileDataBase64Str , inGSettings = None ) :
"""
Append binary file by the base64 string by the pyOpenRPA . Agent daemon process ( safe for JSON transmission )
@ -263,7 +269,7 @@ def AgentOSFileBinaryDataBase64StrAppend(inGSettings, inHostNameStr, inUserStr,
: param inFileDataBase64Str :
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = {
" Def " : " OSFileBinaryDataBase64StrAppend " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
" ArgList " : [ ] , # Args list
@ -276,7 +282,7 @@ def AgentOSFileBinaryDataBase64StrAppend(inGSettings, inHostNameStr, inUserStr,
# Send text file to Agent (string)
def AgentOSFileTextDataStrCreate ( in GSettings, in HostNameStr, inUserStr , inFilePathStr , inFileDataStr , inEncodingStr = " utf-8 " ) :
def AgentOSFileTextDataStrCreate ( in HostNameStr, inUserStr , inFilePathStr , inFileDataStr , inEncodingStr = " utf-8 " , inGSettings = None ) :
"""
Create text file by the string by the pyOpenRPA . Agent daemon process
@ -288,7 +294,7 @@ def AgentOSFileTextDataStrCreate(inGSettings, inHostNameStr, inUserStr, inFilePa
: param inEncodingStr :
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = {
" Def " : " OSFileTextDataStrCreate " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
" ArgList " : [ ] , # Args list
@ -299,7 +305,7 @@ def AgentOSFileTextDataStrCreate(inGSettings, inHostNameStr, inUserStr, inFilePa
#Send item in AgentDict for the futher data transmition
return AgentActivityItemAdd ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inActivityItemDict = lActivityItemDict )
def AgentOSFileBinaryDataBase64StrReceive ( in GSettings, in HostNameStr, inUserStr , inFilePathStr ) :
def AgentOSFileBinaryDataBase64StrReceive ( in HostNameStr, inUserStr , inFilePathStr , inGSettings = None ) :
"""
Read binary file and encode in base64 to transmit ( safe for JSON transmition )
@ -309,7 +315,7 @@ def AgentOSFileBinaryDataBase64StrReceive(inGSettings, inHostNameStr, inUserStr,
: param inFilePathStr : File path to read
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = {
" Def " : " OSFileBinaryDataBase64StrReceive " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
" ArgList " : [ ] , # Args list
@ -320,7 +326,7 @@ def AgentOSFileBinaryDataBase64StrReceive(inGSettings, inHostNameStr, inUserStr,
#Send item in AgentDict for the futher data transmition
return AgentActivityItemAdd ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inActivityItemDict = lActivityItemDict )
def AgentOSFileTextDataStrReceive ( in GSettings, in HostNameStr, inUserStr , inFilePathStr , inEncodingStr = " utf-8 " ) :
def AgentOSFileTextDataStrReceive ( in HostNameStr, inUserStr , inFilePathStr , inEncodingStr = " utf-8 " , inGSettings = None ) :
"""
Read text file in the agent GUI session
@ -331,7 +337,7 @@ def AgentOSFileTextDataStrReceive(inGSettings, inHostNameStr, inUserStr, inFileP
: param inEncodingStr : Text file encoding . Default ' utf-8 '
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = {
" Def " : " OSFileTextDataStrReceive " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
" ArgList " : [ ] , # Args list
@ -342,7 +348,7 @@ def AgentOSFileTextDataStrReceive(inGSettings, inHostNameStr, inUserStr, inFileP
#Send item in AgentDict for the futher data transmition
return AgentActivityItemAdd ( inGSettings = inGSettings , inHostNameStr = inHostNameStr , inUserStr = inUserStr , inActivityItemDict = lActivityItemDict )
def AgentProcessWOExeUpperUserListGet ( in GSettings, in HostNameStr, inUserStr ) :
def AgentProcessWOExeUpperUserListGet ( in HostNameStr, inUserStr , inGSettings = None ) :
"""
Return the process list only for the current user ( where Agent is running ) without . EXE in upper case . Can use in ActivityItem from Orchestrator to Agent
@ -351,7 +357,7 @@ def AgentProcessWOExeUpperUserListGet(inGSettings, inHostNameStr, inUserStr):
: param inUserStr :
: return : GUID String of the ActivityItem - you can wait ( sync or async ) result by this guid !
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lActivityItemDict = {
" Def " : " ProcessWOExeUpperUserListGet " , # def alias (look pyOpeRPA.Agent gSettings["ProcessorDict"]["AliasDefDict"])
" ArgList " : [ ] , # Args list
@ -382,7 +388,7 @@ def OSCredentialsVerify(inUserStr, inPasswordStr, inDomainStr=""): ##
else :
return True
def OSRemotePCRestart ( in Logger, in HostStr, inForceBool = Tru e) :
def OSRemotePCRestart ( in HostStr, inForceBool = Tru e, inLogger = Non e) :
"""
Send signal via power shell to restart remote PC
ATTENTION : Orchestrator user need to have restart right on the Remote machine to restart PC .
@ -392,6 +398,7 @@ def OSRemotePCRestart(inLogger, inHostStr, inForceBool=True):
: param inForceBool : True - send signal to force retart PC ; False - else case
: return :
"""
if inLogger is None : inLogger = OrchestratorLoggerGet ( )
lCMDStr = f " powershell -Command Restart-Computer -ComputerName { inHostStr } "
if inForceBool == True : lCMDStr = lCMDStr + " -Force "
OSCMD ( inCMDStr = lCMDStr , inLogger = inLogger )
@ -405,6 +412,7 @@ def OSCMD(inCMDStr, inRunAsyncBool=True, inLogger = None):
: param inLogger :
: return : CMD result string
"""
if inLogger is None : inLogger = OrchestratorLoggerGet ( )
lResultStr = " "
# Subdef to listen OS result
def _CMDRunAndListenLogs ( inCMDStr , inLogger ) :
@ -443,6 +451,7 @@ def OrchestratorRestart(inGSettings=None):
: param inGSettings : Global settings dict ( singleton )
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
OrchestratorSessionSave ( inGSettings = inGSettings ) # Dump RDP List in file json
if inGSettings is not None :
lL = inGSettings [ " Logger " ]
@ -451,6 +460,14 @@ def OrchestratorRestart(inGSettings=None):
os . execl ( sys . executable , os . path . abspath ( __file__ ) , * sys . argv )
sys . exit ( 0 )
def OrchestratorLoggerGet ( ) :
"""
Get the logger from the Orchestrator
: return :
"""
return GSettingsGet ( ) . get ( " Logger " , None )
def OrchestratorIsAdmin ( ) :
"""
Check if Orchestrator process is running as administrator
@ -474,7 +491,7 @@ def OrchestratorRerunAsAdmin():
else :
print ( f " !SKIPPED! Already run as administrator! " )
def OrchestratorSessionSave ( inGSettings ) :
def OrchestratorSessionSave ( inGSettings = None ) :
"""
Orchestrator session save in file
_SessionLast_RDPList . json ( encoding = " utf-8 " )
@ -483,6 +500,7 @@ def OrchestratorSessionSave(inGSettings):
: param inGSettings : Global settings dict ( singleton )
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lL = inGSettings [ " Logger " ]
try :
# Dump RDP List in file json
@ -502,7 +520,7 @@ def OrchestratorSessionSave(inGSettings):
if lL : lL . exception ( f " Exception when dump data before restart the Orchestrator " )
return True
def OrchestratorSessionRestore ( inGSettings ) :
def OrchestratorSessionRestore ( inGSettings = None ) :
"""
Check _SessionLast_RDPList . json and _SessionLast_StorageDict . pickle in working directory . if exist - load into gsettings
# _SessionLast_StorageDict.pickle (binary)
@ -512,6 +530,7 @@ def OrchestratorSessionRestore(inGSettings):
: param inGSettings : Global settings dict ( singleton )
: return :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lL = inGSettings . get ( " Logger " , None )
# _SessionLast_RDPList.json (encoding = "utf-8")
if os . path . exists ( " _SessionLast_RDPList.json " ) :
@ -551,7 +570,7 @@ def UACUserDictGet(inRequest) -> dict:
"""
return inRequest . UserRoleHierarchyGet ( ) # get the Hierarchy
def UACUpdate ( in GSettings, in ADLoginStr, inADStr = " " , inADIsDefaultBool = True , inURLList = None , inRoleHierarchyAllowedDict = None ) :
def UACUpdate ( in ADLoginStr, inADStr = " " , inADIsDefaultBool = True , inURLList = None , inRoleHierarchyAllowedDict = None , inGSettings = None ) :
"""
Update user access ( UAC )
@ -562,6 +581,7 @@ def UACUpdate(inGSettings, inADLoginStr, inADStr="", inADIsDefaultBool=True, inU
: param inURLList :
: param inRoleHierarchyAllowedDict :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lUserTurple = ( inADStr . upper ( ) , inADLoginStr . upper ( ) ) # Create turple key for inGSettings["ServerDict"]["AccessUsers"]["RuleDomainUserDict"]
if inURLList is None : inURLList = [ ] # Check if None
if inRoleHierarchyAllowedDict is None : inRoleHierarchyAllowedDict = { } # Check if None
@ -586,13 +606,14 @@ def UACUpdate(inGSettings, inADLoginStr, inADStr="", inADIsDefaultBool=True, inU
# Case add default domain + user
inGSettings [ " ServerDict " ] [ " AccessUsers " ] [ " RuleDomainUserDict " ] . update ( { ( " " , inADLoginStr . upper ( ) ) : lRuleDomainUserDict } )
def UACSuperTokenUpdate ( in GSettings, inSuperTokenStr ) :
def UACSuperTokenUpdate ( in SuperTokenStr, inGSettings = None ) :
"""
Add supertoken for the all access ( it is need for the robot communication without human )
: param inGSettings : Global settings dict ( singleton )
: param inSuperTokenStr :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lLoginStr = " SUPERTOKEN "
UACUpdate ( inGSettings = inGSettings , inADLoginStr = lLoginStr )
inGSettings [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] . update (
@ -604,7 +625,7 @@ def UACSuperTokenUpdate(inGSettings, inSuperTokenStr):
# # # # # # # # # # # # # # # # # # # # # # #
def WebURLConnectDef ( in GSettings, in MethodStr, inURLStr , inMatchTypeStr , inDef , inContentTypeStr = " application/octet-stream " ) :
def WebURLConnectDef ( in MethodStr, inURLStr , inMatchTypeStr , inDef , inContentTypeStr = " application/octet-stream " , inGSettings = None ) :
"""
Connect URL to DEF
" inMethodStr " : " GET|POST " ,
@ -620,6 +641,7 @@ def WebURLConnectDef(inGSettings, inMethodStr, inURLStr, inMatchTypeStr, inDef,
: param inDef :
: param inContentTypeStr :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lURLItemDict = {
" Method " : inMethodStr . upper ( ) ,
" URL " : inURLStr , # URL of the request
@ -633,7 +655,7 @@ def WebURLConnectDef(inGSettings, inMethodStr, inURLStr, inMatchTypeStr, inDef,
inGSettings [ " ServerDict " ] [ " URLList " ] . append ( lURLItemDict )
def WebURLConnectFolder ( in GSettings, in MethodStr, inURLStr , inMatchTypeStr , inFolderPathStr ) :
def WebURLConnectFolder ( in MethodStr, inURLStr , inMatchTypeStr , inFolderPathStr , inGSettings = None ) :
"""
Connect URL to Folder
" inMethodStr " : " GET|POST " ,
@ -647,6 +669,7 @@ def WebURLConnectFolder(inGSettings, inMethodStr, inURLStr, inMatchTypeStr, inFo
: param inMatchTypeStr :
: param inFolderPathStr :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check if last symbol is "/" - append if not exist
lFolderPathStr = os . path . abspath ( inFolderPathStr )
if lFolderPathStr [ - 1 ] != " / " : lFolderPathStr + = " / "
@ -663,7 +686,7 @@ def WebURLConnectFolder(inGSettings, inMethodStr, inURLStr, inMatchTypeStr, inFo
inGSettings [ " ServerDict " ] [ " URLList " ] . append ( lURLItemDict )
def WebURLConnectFile ( in GSettings, in MethodStr, inURLStr , inMatchTypeStr , inFilePathStr , inContentTypeStr = " application/octet-stream " ) :
def WebURLConnectFile ( in MethodStr, inURLStr , inMatchTypeStr , inFilePathStr , inContentTypeStr = " application/octet-stream " , inGSettings = None ) :
"""
Connect URL to File
" inMethodStr " : " GET|POST " ,
@ -678,6 +701,7 @@ def WebURLConnectFile(inGSettings, inMethodStr, inURLStr, inMatchTypeStr, inFile
: param inFilePathStr :
: param inContentTypeStr :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lURLItemDict = {
" Method " : inMethodStr . upper ( ) ,
" URL " : inURLStr , # URL of the request
@ -689,7 +713,7 @@ def WebURLConnectFile(inGSettings, inMethodStr, inURLStr, inMatchTypeStr, inFile
}
inGSettings [ " ServerDict " ] [ " URLList " ] . append ( lURLItemDict )
def WebListenCreate ( in GSettings, in ServerKeyStr= " Default " , inAddressStr = " " , inPortInt = 80 , inCertFilePEMPathStr = None , inKeyFilePathStr = None ) :
def WebListenCreate ( in ServerKeyStr= " Default " , inAddressStr = " " , inPortInt = 80 , inCertFilePEMPathStr = None , inKeyFilePathStr = None , inGSettings = None ) :
"""
Create listen interface for the web server
@ -700,7 +724,7 @@ def WebListenCreate(inGSettings, inServerKeyStr="Default", inAddressStr="", inPo
: param inKeyFilePathStr : Path to the private key file
: return :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
inGSettings [ " ServerDict " ] [ " ListenDict " ] [ inServerKeyStr ] = {
" AddressStr " : inAddressStr ,
" PortInt " : inPortInt ,
@ -710,7 +734,7 @@ def WebListenCreate(inGSettings, inServerKeyStr="Default", inAddressStr="", inPo
}
def WebCPUpdate ( in GSettings, in CPKeyStr, inHTMLRenderDef = None , inJSONGeneratorDef = None , inJSInitGeneratorDef = None ) :
def WebCPUpdate ( in CPKeyStr, inHTMLRenderDef = None , inJSONGeneratorDef = None , inJSInitGeneratorDef = None , inGSettings = None ) :
"""
Add control panel HTML , JSON generator or JS when page init
@ -720,6 +744,7 @@ def WebCPUpdate(inGSettings, inCPKeyStr, inHTMLRenderDef=None, inJSONGeneratorDe
: param inJSONGeneratorDef :
: param inJSInitGeneratorDef :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Create Struct if the re is current key
if inCPKeyStr not in inGSettings [ " CPDict " ] :
inGSettings [ " CPDict " ] [ inCPKeyStr ] = { " HTMLRenderDef " : None , " JSONGeneratorDef " : None , " JSInitGeneratorDef " : None }
@ -836,7 +861,7 @@ def WebUserInfoGet(inRequest):
lUserUpperStr = inRequest . OpenRPA [ " User " ] . upper ( )
return { " DomainUpperStr " : lDomainUpperStr , " UserNameUpperStr " : lUserUpperStr }
def WebUserIsSuperToken ( inRequest , inGSettings ) :
def WebUserIsSuperToken ( inRequest , inGSettings = None ) :
"""
Return bool if request is authentificated with supetoken ( token which is never expires )
@ -844,6 +869,7 @@ def WebUserIsSuperToken(inRequest, inGSettings):
: param inGSettings : Global settings dict ( singleton )
: return : bool True - is supertoken ; False - is not supertoken
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lIsSuperTokenBool = False
# Get Flag is supertoken (True|False)
lIsSuperTokenBool = inGSettings . get ( " ServerDict " , { } ) . get ( " AccessUsers " , { } ) . get ( " AuthTokensDict " , { } ) . get ( inRequest . OpenRPA [ " AuthToken " ] , { } ) . get ( " FlagDoNotExpire " , False )
@ -858,8 +884,27 @@ def WebUserUACHierarchyGet(inRequest):
"""
return inRequest . UserRoleHierarchyGet ( )
## GSettings defs
def GSettingsKeyListValueSet ( inGSettings , inValue , inKeyList = None ) :
from . import SettingsTemplate
GSettings = SettingsTemplate . Create ( inModeStr = " BASIC " )
def GSettingsGet ( inGSettings = None ) :
"""
Get the GSettings from the singleton module .
: param inGSettings : You can pass some GSettings to check if it equal to base gsettings . If not equal - def will merge it
: return : GSettings
"""
global GSettings # identify the global variable
# Merge dictionaries if some new dictionary has come
if inGSettings is not None and GSettings is not inGSettings :
GSettings = Server . __ComplexDictMerge2to1Overwrite__ ( in1Dict = inGSettings , in2Dict = GSettings )
return GSettings # Return the result
def GSettingsKeyListValueSet ( inValue , inKeyList = None , inGSettings = None ) :
"""
Set value in GSettings by the key list
@ -868,6 +913,7 @@ def GSettingsKeyListValueSet(inGSettings, inValue, inKeyList=None):
: param inKeyList :
: return : bool
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inKeyList is None : inKeyList = [ ]
lDict = inGSettings
for lItem2 in inKeyList [ : - 1 ] :
@ -880,7 +926,7 @@ def GSettingsKeyListValueSet(inGSettings, inValue, inKeyList=None):
lDict [ inKeyList [ - 1 ] ] = inValue #Set value
return True
def GSettingsKeyListValueGet ( in GSettings, inKeyList = None ) :
def GSettingsKeyListValueGet ( in KeyList= None , inGSettings = None ) :
"""
Get the value from the GSettings by the key list
@ -888,6 +934,7 @@ def GSettingsKeyListValueGet(inGSettings, inKeyList=None):
: param inKeyList :
: return : value any type
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inKeyList is None : inKeyList = [ ]
lDict = inGSettings
for lItem2 in inKeyList [ : - 1 ] :
@ -899,7 +946,7 @@ def GSettingsKeyListValueGet(inGSettings, inKeyList=None):
lDict = lDict [ lItem2 ]
return lDict . get ( inKeyList [ - 1 ] , None )
def GSettingsKeyListValueAppend ( in GSettings, inValue , inKeyList = None ) :
def GSettingsKeyListValueAppend ( in Value, inKeyList = None , inGSettings = None ) :
"""
Append value in GSettings by the key list
@ -926,6 +973,7 @@ def GSettingsKeyListValueAppend(inGSettings, inValue, inKeyList=None):
: param inKeyList : List of the nested keys ( see example )
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inKeyList is None : inKeyList = [ ]
lDict = inGSettings
for lItem2 in inKeyList [ : - 1 ] :
@ -938,7 +986,7 @@ def GSettingsKeyListValueAppend(inGSettings, inValue, inKeyList=None):
lDict [ inKeyList [ - 1 ] ] . append ( inValue ) #Set value
return True
def GSettingsKeyListValueOperatorPlus ( in GSettings, inValue , inKeyList = None ) :
def GSettingsKeyListValueOperatorPlus ( in Value, inKeyList = None , inGSettings = None ) :
"""
Execute plus operation between 2 lists ( 1 : inValue and 2 : gSettings by the inKeyList )
@ -968,6 +1016,7 @@ def GSettingsKeyListValueOperatorPlus(inGSettings, inValue, inKeyList=None):
: param inKeyList : List of the nested keys ( see example )
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inKeyList is None : inKeyList = [ ]
lDict = inGSettings
for lItem2 in inKeyList [ : - 1 ] :
@ -980,7 +1029,7 @@ def GSettingsKeyListValueOperatorPlus(inGSettings, inValue, inKeyList=None):
lDict [ inKeyList [ - 1 ] ] + = inValue #Set value
return True
def ProcessorAliasDefCreate ( in GSettings, inDef , inAliasStr = None ) :
def ProcessorAliasDefCreate ( in Def, inAliasStr = None , inGSettings = None ) :
"""
Create alias for def ( can be used in ActivityItem in field Def )
! WHEN DEF ALIAS IS REQUIRED ! - Def alias is required when you try to call Python def from the Orchestrator WEB side ( because you can ' t transmit Python def object out of the Python environment)
@ -1004,6 +1053,7 @@ def ProcessorAliasDefCreate(inGSettings, inDef, inAliasStr=None):
: return : str Alias string ( Alias can be regenerated if previous alias was occupied )
"""
#TODO Pay attention - New alias can be used too - need to create more complex algorythm to create new alias!
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lL = inGSettings [ " Logger " ]
if inAliasStr is None : inAliasStr = str ( inDef )
# Check if key is not exists
@ -1013,7 +1063,7 @@ def ProcessorAliasDefCreate(inGSettings, inDef, inAliasStr=None):
inGSettings [ " ProcessorDict " ] [ " AliasDefDict " ] [ inAliasStr ] = inDef
return inAliasStr
def ProcessorAliasDefUpdate ( in GSettings, inDef , inAliasStr ) :
def ProcessorAliasDefUpdate ( in Def, inAliasStr , inGSettings = None ) :
"""
Update alias for def ( can be used in ActivityItem in field Def ) .
! WHEN DEF ALIAS IS REQUIRED ! - Def alias is required when you try to call Python def from the Orchestrator WEB side ( because you can ' t transmit Python def object out of the Python environment)
@ -1036,6 +1086,7 @@ def ProcessorAliasDefUpdate(inGSettings, inDef, inAliasStr):
: param inAliasStr : String alias for associated def
: return : str Alias string
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if callable ( inDef ) : inGSettings [ " ProcessorDict " ] [ " AliasDefDict " ] [ inAliasStr ] = inDef
else : raise Exception ( f " pyOpenRPA Exception: You can ' t use Orchestrator.ProcessorAliasDefUpdate with arg ' inDef ' string value. inDef is ' { inDef } ' , inAliasStr is ' { inAliasStr } ' " )
return inAliasStr
@ -1112,7 +1163,7 @@ def ProcessorActivityItemCreate(inDef, inArgList=None, inArgDict=None, inArgGSet
}
return lActivityItemDict
def ProcessorActivityItemAppend ( inGSettings , inDef = None , inArgList = None , inArgDict = None , inArgGSettingsStr = None , inArgLoggerStr = None , inActivityItemDict = None ) :
def ProcessorActivityItemAppend ( inGSettings = None , inDef = None , inArgList = None , inArgDict = None , inArgGSettingsStr = None , inArgLoggerStr = None , inActivityItemDict = None ) :
"""
Create and add activity item in processor queue .
@ -1160,6 +1211,7 @@ def ProcessorActivityItemAppend(inGSettings, inDef=None, inArgList=None, inArgDi
: param inActivityItemDict : Fill if you already have ActivityItemDict ( don ' t fill inDef, inArgList, inArgDict, inArgGSettingsStr, inArgLoggerStr)
: return ActivityItem GUIDStr
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inActivityItemDict is None :
if inArgList is None : inArgList = [ ]
if inArgDict is None : inArgDict = { }
@ -1344,7 +1396,7 @@ def ProcessListGet(inProcessNameWOExeList=None):
return lResult
def ProcessDefIntervalCall ( in GSettings, in Def, inIntervalSecFloat , inIntervalAsyncBool = False , inDefArgList = None , inDefArgDict = None , inDefArgGSettingsNameStr = None , inDefArgLoggerNameStr = None , inExecuteInNewThreadBool = True , inLogger = None ) :
def ProcessDefIntervalCall ( in Def, inIntervalSecFloat , inIntervalAsyncBool = False , inDefArgList = None , inDefArgDict = None , inDefArgGSettingsNameStr = None , inDefArgLoggerNameStr = None , inExecuteInNewThreadBool = True , inLogger = None , inGSettings = None ) :
"""
Use this procedure if you need to run periodically some def . Set def , args , interval and enjoy : )
@ -1360,6 +1412,8 @@ def ProcessDefIntervalCall(inGSettings, inDef, inIntervalSecFloat, inIntervalAsy
: param inLogger : logging def if some case is appear
: return :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inLogger is None : inLogger = OrchestratorLoggerGet ( )
#Some edits on start
if inDefArgDict is None : inDefArgDict = { }
if inDefArgList is None : inDefArgList = [ ]
@ -1448,6 +1502,7 @@ def PythonStart(inModulePathStr, inDefNameStr, inArgList=None, inArgDict=None, i
: param inLogger : Logger instance to log some information when PythonStart def is running
: return : None
"""
if inLogger is None : inLogger = OrchestratorLoggerGet ( )
if inArgList is None : inArgList = [ ]
if inArgDict is None : inArgDict = { }
try :
@ -1461,7 +1516,7 @@ def PythonStart(inModulePathStr, inDefNameStr, inArgList=None, inArgDict=None, i
# Scheduler
# # # # # # # # # # # # # # # # # # # # # # #
def SchedulerActivityTimeAddWeekly ( in GSettings, in TimeHHMMStr= " 23:55: " , inWeekdayList = None , inActivityList = None ) :
def SchedulerActivityTimeAddWeekly ( in TimeHHMMStr= " 23:55: " , inWeekdayList = None , inActivityList = None , inGSettings = None ) :
"""
Add activity item list in scheduler . You can set weekday list and set time when launch . Activity list will be executed at planned time / day .
@ -1492,6 +1547,7 @@ def SchedulerActivityTimeAddWeekly(inGSettings, inTimeHHMMStr="23:55:", inWeekda
: param inActivityList : Activity list structure
: return : None
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inWeekdayList is None : inWeekdayList = [ 0 , 1 , 2 , 3 , 4 , 5 , 6 ]
if inActivityList is None : inActivityList = [ ]
Processor . __ActivityListVerify__ ( inActivityList = inActivityList ) # DO VERIFICATION FOR THE inActivityList
@ -1607,7 +1663,7 @@ def RDPSessionDublicatesResolve(inGSettings):
#for lItemKeyStr in inGSettings["RobotRDPActive"]["RDPList"]:
# lItemDict = inGSettings["RobotRDPActive"]["RDPList"][lItemKeyStr]
def RDPSessionConnect ( in GSettings, in RDPSessionKeyStr, inRDPTemplateDict = None , inHostStr = None , inPortStr = None , inLoginStr = None , inPasswordStr = None ) :
def RDPSessionConnect ( in RDPSessionKeyStr, inRDPTemplateDict = None , inHostStr = None , inPortStr = None , inLoginStr = None , inPasswordStr = None , inGSettings = None ) :
"""
Create new RDPSession in RobotRDPActive . Attention - activity will be ignored if RDP key is already exists
2 way of the use
@ -1638,6 +1694,7 @@ def RDPSessionConnect(inGSettings, inRDPSessionKeyStr, inRDPTemplateDict=None, i
: param inPasswordStr : Backward compatibility from Orchestrator v 1.1 .20 . Use inRDPTemplateDict
: return : True every time : )
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " RDP def was called not from processor queue - activity will be append in the processor queue. " )
@ -1666,7 +1723,7 @@ def RDPSessionConnect(inGSettings, inRDPSessionKeyStr, inRDPTemplateDict=None, i
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " RDP session was not created because it is alredy exists in the RDPList. Use RDPSessionReconnect if you want to update RDP configuration. " )
return True
def RDPSessionDisconnect ( in GSettings, in RDPSessionKeyStr, inBreakTriggerProcessWOExeList = None ) :
def RDPSessionDisconnect ( in RDPSessionKeyStr, inBreakTriggerProcessWOExeList = None , inGSettings = None ) :
"""
Disconnect the RDP session and stop monitoring it .
@ -1689,6 +1746,7 @@ def RDPSessionDisconnect(inGSettings, inRDPSessionKeyStr, inBreakTriggerProcessW
Orchestrator look processes on the current machine
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inBreakTriggerProcessWOExeList is None : inBreakTriggerProcessWOExeList = [ ]
# Check thread
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
@ -1713,7 +1771,7 @@ def RDPSessionDisconnect(inGSettings, inRDPSessionKeyStr, inBreakTriggerProcessW
Connector . SystemRDPWarningClickOk ( ) # Click all warning messages
return True
def RDPSessionReconnect ( in GSettings, in RDPSessionKeyStr, inRDPTemplateDict = None ) :
def RDPSessionReconnect ( in RDPSessionKeyStr, inRDPTemplateDict = None , inGSettings = None ) :
"""
Reconnect the RDP session
@ -1737,6 +1795,7 @@ def RDPSessionReconnect(inGSettings, inRDPSessionKeyStr, inRDPTemplateDict=None)
: param inRDPTemplateDict : RDP configuration dict with settings ( see def Orchestrator . RDPTemplateCreate )
: return :
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " RDP def was called not from processor queue - activity will be append in the processor queue. " )
@ -1759,7 +1818,7 @@ def RDPSessionReconnect(inGSettings, inRDPSessionKeyStr, inRDPTemplateDict=None)
Connector . Session ( lRDPConfigurationItem )
return True
def RDPSessionMonitorStop ( in GSettings, inRDPSessionKeyStr ) :
def RDPSessionMonitorStop ( in RDPSessionKeyStr, inGSettings = None ) :
"""
Stop monitoring the RDP session by the Orchestrator process . Current def don ' t kill RDP session - only stop to track it (it can give )
@ -1777,11 +1836,12 @@ def RDPSessionMonitorStop(inGSettings, inRDPSessionKeyStr):
: param inRDPSessionKeyStr : RDP Session string key - need for the further identification
: return : True every time : >
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lResult = True
inGSettings [ " RobotRDPActive " ] [ " RDPList " ] . pop ( inRDPSessionKeyStr , None ) # Remove item from RDPList
return lResult
def RDPSessionLogoff ( in GSettings, in RDPSessionKeyStr, inBreakTriggerProcessWOExeList = None ) :
def RDPSessionLogoff ( in RDPSessionKeyStr, inBreakTriggerProcessWOExeList = None , inGSettings = None ) :
"""
Logoff the RDP session from the Orchestrator process ( close all apps in session when logoff )
@ -1801,6 +1861,7 @@ def RDPSessionLogoff(inGSettings, inRDPSessionKeyStr, inBreakTriggerProcessWOExe
: param inBreakTriggerProcessWOExeList : List of the processes , which will stop the execution . Example [ " notepad " ]
: return : True - logoff is successful
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
if inBreakTriggerProcessWOExeList is None : inBreakTriggerProcessWOExeList = [ ]
lResult = True
# Check thread
@ -1828,7 +1889,7 @@ def RDPSessionLogoff(inGSettings, inRDPSessionKeyStr, inBreakTriggerProcessWOExe
inGSettings [ " RobotRDPActive " ] [ " RDPList " ] . pop ( inRDPSessionKeyStr , None ) # Remove item from RDPList
return lResult
def RDPSessionResponsibilityCheck ( in GSettings, inRDPSessionKeyStr ) :
def RDPSessionResponsibilityCheck ( in RDPSessionKeyStr, inGSettings = None ) :
"""
DEVELOPING , MAYBE NOT USEFUL Check RDP Session responsibility TODO NEED DEV + TEST
@ -1836,6 +1897,7 @@ def RDPSessionResponsibilityCheck(inGSettings, inRDPSessionKeyStr):
: param inRDPSessionKeyStr : RDP Session string key - need for the further identification
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " RDP def was called not from processor queue - activity will be append in the processor queue. " )
@ -1871,7 +1933,7 @@ def RDPSessionResponsibilityCheck(inGSettings, inRDPSessionKeyStr):
lDoCheckResponsibilityCountCurrent + = 1
return True
def RDPSessionProcessStartIfNotRunning ( in GSettings, in RDPSessionKeyStr, inProcessNameWEXEStr , inFilePathStr , inFlagGetAbsPathBool = Tru e) :
def RDPSessionProcessStartIfNotRunning ( in RDPSessionKeyStr, inProcessNameWEXEStr , inFilePathStr , inFlagGetAbsPathBool = Tru e, inGSettings = Non e) :
"""
Start process in RDP if it is not running ( check by the arg inProcessNameWEXEStr )
@ -1895,6 +1957,7 @@ def RDPSessionProcessStartIfNotRunning(inGSettings, inRDPSessionKeyStr, inProces
: param inFlagGetAbsPathBool : True - get abs path from the relative path in inFilePathStr . False - else case
: return : True every time : )
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
lResult = True
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
@ -1917,7 +1980,7 @@ def RDPSessionProcessStartIfNotRunning(inGSettings, inRDPSessionKeyStr, inProces
inRDPConfigurationItem = inGSettings [ " RobotRDPActive " ] [ " RDPList " ] [ inRDPSessionKeyStr ] )
return lResult
def RDPSessionCMDRun ( in GSettings, in RDPSessionKeyStr, inCMDStr , inModeStr = " CROSSCHECK " ) :
def RDPSessionCMDRun ( in RDPSessionKeyStr, inCMDStr , inModeStr = " CROSSCHECK " , inGSettings = None ) :
"""
Send CMD command to the RDP session " RUN " window
@ -1945,6 +2008,7 @@ def RDPSessionCMDRun(inGSettings, inRDPSessionKeyStr, inCMDStr, inModeStr="CROSS
" IsResponsibleBool " : True | False # Flag is RDP is responsible - works only when inModeStr = CROSSCHECK
}
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
lResult = {
" OutStr " : None , # Result string
" IsResponsibleBool " : False # Flag is RDP is responsible - works only when inModeStr = CROSSCHECK
@ -1970,7 +2034,7 @@ def RDPSessionCMDRun(inGSettings, inRDPSessionKeyStr, inCMDStr, inModeStr="CROSS
inRDPConfigurationItem = inGSettings [ " RobotRDPActive " ] [ " RDPList " ] [ inRDPSessionKeyStr ] )
return lResult
def RDPSessionProcessStop ( in GSettings, in RDPSessionKeyStr, inProcessNameWEXEStr , inFlagForceCloseBool ) :
def RDPSessionProcessStop ( in RDPSessionKeyStr, inProcessNameWEXEStr , inFlagForceCloseBool , inGSettings = None ) :
"""
Send CMD command to the RDP session " RUN " window .
@ -1992,6 +2056,7 @@ def RDPSessionProcessStop(inGSettings, inRDPSessionKeyStr, inProcessNameWEXEStr,
: param inFlagForceCloseBool : True - force close the process . False - safe close the process
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " RDP def was called not from processor queue - activity will be append in the processor queue. " )
@ -2015,7 +2080,7 @@ def RDPSessionProcessStop(inGSettings, inRDPSessionKeyStr, inProcessNameWEXEStr,
Connector . SessionCMDRun ( inSessionHex = lSessionHex , inCMDCommandStr = lCMDStr , inModeStr = " CROSSCHECK " , inLogger = inGSettings [ " Logger " ] , inRDPConfigurationItem = inGSettings [ " RobotRDPActive " ] [ " RDPList " ] [ inRDPSessionKeyStr ] )
return lResult
def RDPSessionFileStoredSend ( in GSettings, in RDPSessionKeyStr, inHostFilePathStr , inRDPFilePathStr ) :
def RDPSessionFileStoredSend ( in RDPSessionKeyStr, inHostFilePathStr , inRDPFilePathStr , inGSettings = None ) :
"""
Send file from Orchestrator session to the RDP session using shared drive in RDP ( see RDP Configuration Dict , Shared drive )
@ -2037,6 +2102,7 @@ def RDPSessionFileStoredSend(inGSettings, inRDPSessionKeyStr, inHostFilePathStr,
: param inRDPFilePathStr : ! Absolute ! path to the destination file location on the RDP side . Example : " C: \\ RPA \\ TESTDIR \\ Test.py "
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " RDP def was called not from processor queue - activity will be append in the processor queue. " )
@ -2059,7 +2125,7 @@ def RDPSessionFileStoredSend(inGSettings, inRDPSessionKeyStr, inHostFilePathStr,
Connector . SessionCMDRun ( inSessionHex = lSessionHex , inCMDCommandStr = lCMDStr , inModeStr = " LISTEN " , inClipboardTimeoutSec = 120 , inLogger = inGSettings [ " Logger " ] , inRDPConfigurationItem = inGSettings [ " RobotRDPActive " ] [ " RDPList " ] [ inRDPSessionKeyStr ] )
return lResult
def RDPSessionFileStoredRecieve ( in GSettings, in RDPSessionKeyStr, inRDPFilePathStr , inHostFilePathStr ) :
def RDPSessionFileStoredRecieve ( in RDPSessionKeyStr, inRDPFilePathStr , inHostFilePathStr , inGSettings = None ) :
"""
Recieve file from RDP session to the Orchestrator session using shared drive in RDP ( see RDP Configuration Dict , Shared drive )
@ -2081,6 +2147,7 @@ def RDPSessionFileStoredRecieve(inGSettings, inRDPSessionKeyStr, inRDPFilePathSt
: param inHostFilePathStr : Relative or absolute path to the file location on the Orchestrator side . Example : " TESTDIR \\ Test.py "
: return : True every time
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
# Check thread
if not Core . IsProcessorThread ( inGSettings = inGSettings ) :
if inGSettings [ " Logger " ] : inGSettings [ " Logger " ] . warning ( f " RDP def was called not from processor queue - activity will be append in the processor queue. " )
@ -2106,13 +2173,14 @@ def RDPSessionFileStoredRecieve(inGSettings, inRDPSessionKeyStr, inRDPFilePathSt
# # # # # Start orchestrator
# # # # # # # # # # # # # # # # # # # # # # #
def GSettingsAutocleaner ( inGSettings ) :
def GSettingsAutocleaner ( inGSettings = None ) :
"""
HIDDEN Interval gSettings auto cleaner def to clear some garbage .
: param inGSettings : Global settings dict ( singleton )
: return : None
"""
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
while True :
time . sleep ( inGSettings [ " Autocleaner " ] [ " IntervalSecFloat " ] ) # Wait for the next iteration
lL = inGSettings [ " Logger " ]
@ -2140,13 +2208,32 @@ def GSettingsAutocleaner(inGSettings):
from . . import __version__ # Get version from the package
def Orchestrator ( inGSettings , inDumpRestoreBool = True , inRunAsAdministratorBool = True ) :
def Start ( inDumpRestoreBool = True , inRunAsAdministratorBool = True ) :
"""
Start the orchestrator threads execution
: param inDumpRestoreBool : True - restore data from the dumo
: param inRunAsAdministratorBool : True - rerun as admin if not
: return :
"""
Orchestrator ( inDumpRestoreBool = True , inRunAsAdministratorBool = True )
def Orchestrator ( inGSettings = None , inDumpRestoreBool = True , inRunAsAdministratorBool = True ) :
"""
Main def to start orchestrator
: param inGSettings :
: param inDumpRestoreBool :
: param inRunAsAdministratorBool :
: return :
"""
lL = inGSettings [ " Logger " ]
# https://stackoverflow.com/questions/130763/request-uac-elevation-from-within-a-python-script
if not OrchestratorIsAdmin ( ) and inRunAsAdministratorBool == True :
OrchestratorRerunAsAdmin ( )
else :
# Code of your program here
inGSettings = GSettingsGet ( inGSettings = inGSettings ) # Set the global settings
#mGlobalDict = Settings.Settings(sys.argv[1])
global gSettingsDict
gSettingsDict = inGSettings # Alias for old name in alg