import json
from inspect import signature # For detect count of def args
#ControlPanelDict
from desktopmagic . screengrab_win32 import (
getDisplayRects , saveScreenToBmp , saveRectToBmp , getScreenAsImage ,
getRectAsImage , getDisplaysAsImages )
from http import cookies
import uuid # generate UUID4
import time # sleep functions
import datetime # datetime functions
import threading # Multi-threading
from . Web import Basic
from . import BackwardCompatibility # Support old up to 1.2.0 defs
from . import Processor
# # # # # # # # # # # #
# v 1.2.0 Functionallity
# # # # # # # # # # # #
# Generate CP
# Return {"Key":{"",""}}
def HiddenCPDictGenerate ( inRequest , inGSettings ) :
lL = inGSettings [ " Logger " ] # Alias for logger
# Create result JSON
lCPDict = { }
lRenderFunctionsRobotList = inGSettings [ " ControlPanelDict " ] [ " RobotList " ]
for lItem in lRenderFunctionsRobotList :
lUACBool = True # Check if render function is applicable User Access Rights (UAC)
if inGSettings [ " Server " ] [ " AccessUsers " ] [ " FlagCredentialsAsk " ] is True :
lUserRights = inGSettings [ " Server " ] [ " AccessUsers " ] [ " RuleDomainUserDict " ] [ ( inRequest . OpenRPA [ " Domain " ] . upper ( ) , inRequest . OpenRPA [ " User " ] . upper ( ) ) ]
if len ( lUserRights [ " ControlPanelKeyAllowedList " ] ) > 0 and lItem [ " KeyStr " ] not in lUserRights [ " ControlPanelKeyAllowedList " ] :
lUACBool = False # UAC Check is not passed - False for user
if lUACBool : # Run function if UAC is TRUE
# Выполнить вызов и записать результат
# Call def (inRequest, inGSettings) or def (inGSettings)
lItemResultDict = None
lDEFSignature = signature ( lItem [ " RenderFunction " ] ) # Get signature of the def
lDEFARGLen = len ( lDEFSignature . parameters . keys ( ) ) # get count of the def args
try :
if lDEFARGLen == 1 : # def (inGSettings)
lItemResultDict = lItem [ " RenderFunction " ] ( inGSettings )
elif lDEFARGLen == 2 : # def (inRequest, inGSettings)
lItemResultDict = lItem [ " RenderFunction " ] ( inRequest , inGSettings )
elif lDEFARGLen == 0 : # def ()
lItemResultDict = lItem [ " RenderFunction " ] ( )
# RunFunction
# lResultJSON["RenderRobotList"].append(lItemResultDict)
# Backward compatibility up to 1.2.0 - call HTML generator if result has no "HTMLStr"
if " HTMLStr " in lItemResultDict or " DataDict " in lItemResultDict :
lCPDict [ lItem [ " KeyStr " ] ] = lItemResultDict # new version
else :
# Call backward compatibility HTML generator
lCPDict [ lItem [ " KeyStr " ] ] = { " HTMLStr " : Basic . HTMLControlPanelBC ( inCPDict = lItemResultDict ) , " DataDict " : { } }
except Exception as e :
if lL : lL . exception ( f " Error in control panel. CP item { lItem } . Exception is below " )
return lCPDict
# Return {"Key":{"",""}}
def HiddenRDPDictGenerate ( inRequest , inGSettings ) :
lRDPDict = { }
# Iterate throught the RDP list
for lRDPSessionKeyStrItem in inGSettings [ " RobotRDPActive " ] [ " RDPList " ] :
lRDPConfiguration = inGSettings [ " RobotRDPActive " ] [ " RDPList " ] [
lRDPSessionKeyStrItem ] # Get the configuration dict
lDataItemDict = { " SessionKeyStr " : " " , " SessionHexStr " : " " , " IsFullScreenBool " : False ,
" IsIgnoredBool " : False } # Template
lDataItemDict [ " SessionKeyStr " ] = lRDPSessionKeyStrItem # Session key str
lDataItemDict [ " SessionHexStr " ] = lRDPConfiguration [ " SessionHex " ] # Session Hex
lDataItemDict [ " IsFullScreenBool " ] = True if lRDPSessionKeyStrItem == inGSettings [ " RobotRDPActive " ] [
" FullScreenRDPSessionKeyStr " ] else False # Check the full screen for rdp window
lDataItemDict [ " IsIgnoredBool " ] = lRDPConfiguration [ " SessionIsIgnoredBool " ] # Is ignored
lRDPDict [ lDataItemDict [ " SessionKeyStr " ] ] . append ( lDataItemDict )
return lRDPDict
#v1.2.0 Send data container to the client from the server
# /pyOpenRPA/ServerData return {"HashStr" , "ServerDataDict": {"CPKeyStr":{"HTMLStr":"", DataDict:{}}}}
# Client: mGlobal.pyOpenRPA.ServerDataHashStr
# Client: mGlobal.pyOpenRPA.ServerDataDict
def pyOpenRPA_ServerData ( inRequest , inGSettings ) :
# Extract the hash value from request
lValueStr = None
if inRequest . headers . get ( ' Content-Length ' ) is not None :
lInputByteArrayLength = int ( inRequest . headers . get ( ' Content-Length ' ) )
lInputByteArray = inRequest . rfile . read ( lInputByteArrayLength )
# Превращение массива байт в объект
lValueStr = ( lInputByteArray . decode ( ' utf8 ' ) )
# Generate ServerDataDict
lFlagDoGenerateBool = True
while lFlagDoGenerateBool :
lServerDataDict = {
" CPDict " : HiddenCPDictGenerate ( inRequest = inRequest , inGSettings = inGSettings ) ,
" RDPDict " : HiddenRDPDictGenerate ( inRequest = inRequest , inGSettings = inGSettings ) ,
" UserDict " : { " UACClientDict " : inRequest . OpenRPA [ " DefUserRoleHierarchyGet " ] ( ) }
}
# Create JSON
lServerDataDictJSONStr = json . dumps ( lServerDataDict )
# Generate hash
lServerDataHashStr = str ( hash ( lServerDataDictJSONStr ) )
if lValueStr != lServerDataHashStr and lServerDataHashStr != " " and lServerDataHashStr != None : # Case if Hash is not equal
lFlagDoGenerateBool = False
else : # Case Hashes are equal
time . sleep ( inGSettings [ " Client " ] [ " Session " ] [ " ControlPanelRefreshIntervalSecFloat " ] )
# Return the result if Hash is changed
lResult = { " HashStr " : lServerDataHashStr , " ServerDataDict " : lServerDataDict }
inResponseDict = inRequest . OpenRPAResponseDict
# Send message back to client
message = json . dumps ( lResult )
# Write content as utf-8 data
inResponseDict [ " Body " ] = bytes ( message , " utf8 " )
return lResult
#v1.2.0 Send data container to the client from the server
# /pyOpenRPA/ServerLog return {"HashStr" , "ServerLogList": ["row 1", "row 2"]}
# Client: mGlobal.pyOpenRPA.ServerLogListHashStr
# Client: mGlobal.pyOpenRPA.ServerLogList
def pyOpenRPA_ServerLog ( inRequest , inGSDict ) :
# Extract the hash value from request
lValueStr = None
if inRequest . headers . get ( ' Content-Length ' ) is not None :
lInputByteArrayLength = int ( inRequest . headers . get ( ' Content-Length ' ) )
lInputByteArray = inRequest . rfile . read ( lInputByteArrayLength )
# Превращение массива байт в объект
lValueStr = ( lInputByteArray . decode ( ' utf8 ' ) )
# Generate ServerDataDict
lFlagDoGenerateBool = True
while lFlagDoGenerateBool :
lServerLogList = inGSDict [ " Client " ] [ " DumpLogList " ]
# Get hash
lServerLogListHashStr = inGSDict [ " Client " ] [ " DumpLogListHashStr " ]
if lValueStr != lServerLogListHashStr and lServerLogListHashStr != " " and lServerLogListHashStr != None : # Case if Hash is not equal Fix because None can be obtained without JSON decode
lFlagDoGenerateBool = False
else : # Case Hashes are equal
time . sleep ( inGSDict [ " Client " ] [ " DumpLogListRefreshIntervalSecFloat " ] )
# Return the result if Hash is changed
lResult = { " HashStr " : lServerLogListHashStr , " ServerLogList " : lServerLogList }
inResponseDict = inRequest . OpenRPAResponseDict
# Send message back to client
message = json . dumps ( lResult )
# Write content as utf-8 data
inResponseDict [ " Body " ] = bytes ( message , " utf8 " )
return lResult
def pyOpenRPA_Screenshot ( inRequest , inGlobalDict ) :
# Get Screenshot
def SaveScreenshot ( inFilePath ) :
# grab fullscreen
# Save the entire virtual screen as a PNG
lScreenshot = getScreenAsImage ( )
lScreenshot . save ( ' screenshot.png ' , format = ' png ' )
# lScreenshot = ScreenshotSecondScreen.grab_screen()
# save image file
# lScreenshot.save('screenshot.png')
# Сохранить файл на диск
SaveScreenshot ( " Screenshot.png " )
lFileObject = open ( " Screenshot.png " , " rb " )
# Write content as utf-8 data
inRequest . OpenRPAResponseDict [ " Body " ] = lFileObject . read ( )
# Закрыть файловый объект
lFileObject . close ( )
# Add activity item or activity list to the processor queue
# Body is Activity item or Activity List
def pyOpenRPA_Processor ( inRequest , inGSettings ) :
# Recieve the data
lValueStr = None
if inRequest . headers . get ( ' Content-Length ' ) is not None :
lInputByteArrayLength = int ( inRequest . headers . get ( ' Content-Length ' ) )
lInputByteArray = inRequest . rfile . read ( lInputByteArrayLength )
# Превращение массива байт в объект
lInput = json . loads ( lInputByteArray . decode ( ' utf8 ' ) )
# If list - operator plus
if type ( lInput ) is list :
inGSettings [ " ProcessorDict " ] [ " ActivityList " ] + = lInput
else :
inGSettings [ " ProcessorDict " ] [ " ActivityList " ] . append ( lInput )
# Execute activity list
def pyOpenRPA_ActivityListExecute ( inRequest , inGSettings ) :
# Recieve the data
lValueStr = None
if inRequest . headers . get ( ' Content-Length ' ) is not None :
lInputByteArrayLength = int ( inRequest . headers . get ( ' Content-Length ' ) )
lInputByteArray = inRequest . rfile . read ( lInputByteArrayLength )
# Превращение массива байт в объект
lInput = json . loads ( lInputByteArray . decode ( ' utf8 ' ) )
# If list - operator plus
if type ( lInput ) is list :
lResultList = Processor . ActivityListExecute ( inGSettings = inGSettings , inActivityList = lInput )
inRequest . OpenRPAResponseDict [ " Body " ] = bytes ( json . dumps ( lResultList ) , " utf8 " )
else :
lResultList = Processor . ActivityListExecute ( inGSettings = inGSettings , inActivityList = [ lInput ] )
inRequest . OpenRPAResponseDict [ " Body " ] = bytes ( json . dumps ( lResultList [ 0 ] ) , " utf8 " )
# See docs in Agent (pyOpenRPA.Agent.O2A)
def pyOpenRPA_Agent_O2A ( inRequest , inGSettings ) :
# Test solution
while True :
time . sleep ( 3 )
# See docs in Agent (pyOpenRPA.Agent.A2O)
def pyOpenRPA_Agent_A2O ( inRequest , inGSettings ) :
# Recieve the data
lValueStr = None
if inRequest . headers . get ( ' Content-Length ' ) is not None :
lInputByteArrayLength = int ( inRequest . headers . get ( ' Content-Length ' ) )
lInputByteArray = inRequest . rfile . read ( lInputByteArrayLength )
print ( lInputByteArray )
# Превращение массива байт в объект
lInput = json . loads ( lInputByteArray . decode ( ' utf8 ' ) )
if " LogList " in lInput :
for lLogItemStr in lInput [ " LogList " ] :
inGSettings [ " Logger " ] . info ( lLogItemStr )
def SettingsUpdate ( inGlobalConfiguration ) :
import os
import pyOpenRPA . Orchestrator
lOrchestratorFolder = " \\ " . join ( pyOpenRPA . Orchestrator . __file__ . split ( " \\ " ) [ : - 1 ] )
lURLList = \
[ #List of available URLs with the orchestrator server
#{
# "Method":"GET|POST",
# "URL": "/index", #URL of the request
# "MatchType": "", #"BeginWith|Contains|Equal|EqualCase",
# "ResponseFilePath": "", #Absolute or relative path
# "ResponseFolderPath": "", #Absolute or relative path
# "ResponseContentType": "", #HTTP Content-type
# "ResponseDefRequestGlobal": None #Function with str result
#}
#Orchestrator basic dependencies
{ " Method " : " GET " , " URL " : " / " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " Web \\ Index.xhtml " ) , " ResponseContentType " : " text/html " } ,
{ " Method " : " GET " , " URL " : " /Index.js " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " Web \\ Index.js " ) , " ResponseContentType " : " text/javascript " } ,
{ " Method " : " GET " , " URL " : " /3rdParty/Semantic-UI-CSS-master/semantic.min.css " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " .. \\ Resources \\ Web \\ Semantic-UI-CSS-master \\ semantic.min.css " ) , " ResponseContentType " : " text/css " } ,
{ " Method " : " GET " , " URL " : " /3rdParty/Semantic-UI-CSS-master/semantic.min.js " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " .. \\ Resources \\ Web \\ Semantic-UI-CSS-master \\ semantic.min.js " ) , " ResponseContentType " : " application/javascript " } ,
{ " Method " : " GET " , " URL " : " /3rdParty/jQuery/jquery-3.1.1.min.js " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " .. \\ Resources \\ Web \\ jQuery \\ jquery-3.1.1.min.js " ) , " ResponseContentType " : " application/javascript " } ,
{ " Method " : " GET " , " URL " : " /3rdParty/Google/LatoItalic.css " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " .. \\ Resources \\ Web \\ Google \\ LatoItalic.css " ) , " ResponseContentType " : " font/css " } ,
{ " Method " : " GET " , " URL " : " /3rdParty/Semantic-UI-CSS-master/themes/default/assets/fonts/icons.woff2 " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " .. \\ Resources \\ Web \\ Semantic-UI-CSS-master \\ themes \\ default \\ assets \\ fonts \\ icons.woff2 " ) , " ResponseContentType " : " font/woff2 " } ,
{ " Method " : " GET " , " URL " : " /favicon.ico " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " Web \\ favicon.ico " ) , " ResponseContentType " : " image/x-icon " } ,
{ " Method " : " GET " , " URL " : " /3rdParty/Handlebars/handlebars-v4.1.2.js " , " MatchType " : " EqualCase " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " .. \\ Resources \\ Web \\ Handlebars \\ handlebars-v4.1.2.js " ) , " ResponseContentType " : " application/javascript " } ,
{ " Method " : " GET " , " URL " : " /Monitor/ControlPanelDictGet " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : BackwardCompatibility . v1_2_0_Monitor_ControlPanelDictGet_SessionCheckInit , " ResponseContentType " : " application/json " } ,
{ " Method " : " GET " , " URL " : " /GetScreenshot " , " MatchType " : " BeginWith " , " ResponseDefRequestGlobal " : pyOpenRPA_Screenshot , " ResponseContentType " : " image/png " } ,
{ " Method " : " GET " , " URL " : " /pyOpenRPA_logo.png " , " MatchType " : " Equal " , " ResponseFilePath " : os . path . join ( lOrchestratorFolder , " .. \\ Resources \\ Web \\ pyOpenRPA_logo.png " ) , " ResponseContentType " : " image/png " } ,
{ " Method " : " POST " , " URL " : " /Orchestrator/UserRoleHierarchyGet " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : BackwardCompatibility . v1_2_0_UserRoleHierarchyGet , " ResponseContentType " : " application/json " } ,
# New way of the v.1.2.0 functionallity (all defs by the URL from /pyOpenRPA/...)
{ " Method " : " POST " , " URL " : " /pyOpenRPA/ServerData " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_ServerData , " ResponseContentType " : " application/json " } ,
{ " Method " : " POST " , " URL " : " /pyOpenRPA/ServerLog " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_ServerLog , " ResponseContentType " : " application/json " } ,
{ " Method " : " GET " , " URL " : " /pyOpenRPA/Screenshot " , " MatchType " : " BeginWith " , " ResponseDefRequestGlobal " : pyOpenRPA_Screenshot , " ResponseContentType " : " image/png " } ,
{ " Method " : " POST " , " URL " : " /pyOpenRPA/Processor " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_Processor , " ResponseContentType " : " application/json " } ,
{ " Method " : " POST " , " URL " : " /pyOpenRPA/ActivityListExecute " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_ActivityListExecute , " ResponseContentType " : " application/json " } ,
{ " Method " : " POST " , " URL " : " /pyOpenRPA/Agent/O2A " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_Agent_O2A , " ResponseContentType " : " application/json " } ,
{ " Method " : " POST " , " URL " : " /pyOpenRPA/Agent/A2O " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_Agent_A2O , " ResponseContentType " : " application/json " } ,
]
inGlobalConfiguration [ " Server " ] [ " URLList " ] = inGlobalConfiguration [ " Server " ] [ " URLList " ] + lURLList
return inGlobalConfiguration