You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
271 lines
17 KiB
271 lines
17 KiB
import json
|
|
import copy
|
|
from inspect import signature # For detect count of def args
|
|
#ControlPanelDict
|
|
from desktopmagic.screengrab_win32 import (
|
|
getDisplayRects, saveScreenToBmp, saveRectToBmp, getScreenAsImage,
|
|
getRectAsImage, getDisplaysAsImages)
|
|
from http import cookies
|
|
import uuid # generate UUID4
|
|
import time # sleep functions
|
|
import datetime # datetime functions
|
|
import threading # Multi-threading
|
|
from .Web import Basic
|
|
from . import BackwardCompatibility # Support old up to 1.2.0 defs
|
|
from . import Processor
|
|
# # # # # # # # # # # #
|
|
# v 1.2.0 Functionallity
|
|
# # # # # # # # # # # #
|
|
# Generate CP
|
|
# Return {"Key":{"",""}}
|
|
def HiddenCPDictGenerate(inRequest, inGSettings):
|
|
lL = inGSettings["Logger"] # Alias for logger
|
|
# Create result JSON
|
|
lCPDict = {}
|
|
lRenderFunctionsRobotList = inGSettings["ControlPanelDict"]["RobotList"]
|
|
for lItem in lRenderFunctionsRobotList:
|
|
lUACBool = True # Check if render function is applicable User Access Rights (UAC)
|
|
if inGSettings["Server"]["AccessUsers"]["FlagCredentialsAsk"] is True:
|
|
lUserRights = inGSettings["Server"]["AccessUsers"]["RuleDomainUserDict"][(inRequest.OpenRPA["Domain"].upper(), inRequest.OpenRPA["User"].upper())]
|
|
if len(lUserRights["ControlPanelKeyAllowedList"]) > 0 and lItem["KeyStr"] not in lUserRights["ControlPanelKeyAllowedList"]:
|
|
lUACBool = False # UAC Check is not passed - False for user
|
|
if lUACBool: # Run function if UAC is TRUE
|
|
# Выполнить вызов и записать результат
|
|
# Call def (inRequest, inGSettings) or def (inGSettings)
|
|
lItemResultDict = None
|
|
lDEFSignature = signature(lItem["RenderFunction"]) # Get signature of the def
|
|
lDEFARGLen = len(lDEFSignature.parameters.keys()) # get count of the def args
|
|
try:
|
|
if lDEFARGLen == 1: # def (inGSettings)
|
|
lItemResultDict = lItem["RenderFunction"](inGSettings)
|
|
elif lDEFARGLen == 2: # def (inRequest, inGSettings)
|
|
lItemResultDict = lItem["RenderFunction"](inRequest, inGSettings)
|
|
elif lDEFARGLen == 0: # def ()
|
|
lItemResultDict = lItem["RenderFunction"]()
|
|
# RunFunction
|
|
# lResultJSON["RenderRobotList"].append(lItemResultDict)
|
|
# Backward compatibility up to 1.2.0 - call HTML generator if result has no "HTMLStr"
|
|
if "HTMLStr" in lItemResultDict or "DataDict" in lItemResultDict:
|
|
lCPDict[lItem["KeyStr"]] = lItemResultDict # new version
|
|
else:
|
|
# Call backward compatibility HTML generator
|
|
lCPDict[lItem["KeyStr"]] = {"HTMLStr": Basic.HTMLControlPanelBC(inCPDict=lItemResultDict), "DataDict":{}}
|
|
except Exception as e:
|
|
if lL: lL.exception(f"Error in control panel. CP item {lItem}. Exception is below")
|
|
return lCPDict
|
|
|
|
# Return {"Key":{"",""}}
|
|
def HiddenRDPDictGenerate(inRequest, inGSettings):
|
|
lRDPDict = {"HandlebarsList":[]}
|
|
# Iterate throught the RDP list
|
|
for lRDPSessionKeyStrItem in inGSettings["RobotRDPActive"]["RDPList"]:
|
|
lRDPConfiguration = inGSettings["RobotRDPActive"]["RDPList"][
|
|
lRDPSessionKeyStrItem] # Get the configuration dict
|
|
lDataItemDict = {"SessionKeyStr": "", "SessionHexStr": "", "IsFullScreenBool": False,
|
|
"IsIgnoredBool": False} # Template
|
|
lDataItemDict["SessionKeyStr"] = lRDPSessionKeyStrItem # Session key str
|
|
lDataItemDict["SessionHexStr"] = lRDPConfiguration["SessionHex"] # Session Hex
|
|
lDataItemDict["IsFullScreenBool"] = True if lRDPSessionKeyStrItem == inGSettings["RobotRDPActive"][
|
|
"FullScreenRDPSessionKeyStr"] else False # Check the full screen for rdp window
|
|
lDataItemDict["IsIgnoredBool"] = lRDPConfiguration["SessionIsIgnoredBool"] # Is ignored
|
|
lRDPDict[lDataItemDict["SessionKeyStr"]].append(lDataItemDict)
|
|
lHandlebarsDataItemDict = copy.deepcopy(lDataItemDict)
|
|
lHandlebarsDataItemDict["SessionKeyStr"]=lDataItemDict["SessionKeyStr"]
|
|
lRDPDict["HandlebarsList"].append(lHandlebarsDataItemDict)
|
|
return lRDPDict
|
|
|
|
# Return {"HostNameUpperStr;UserUpperStr":{"IsListenBool":True}, "HandlebarsList":[{"HostnameUpperStr":"","UserUpperStr":"","IsListenBool":True}]}
|
|
def HiddenAgentDictGenerate(inRequest, inGSettings):
|
|
lAgentDict = {"HandlebarsList":[]}
|
|
# Iterate throught the RDP list
|
|
for lAgentItemKeyStrItem in inGSettings["AgentDict"]:
|
|
lKeyStr = f"{lAgentItemKeyStrItem[0]};{lAgentItemKeyStrItem[1]}" # turple ("HostNameUpperStr","UserUpperStr") > Str "HostNameUpperStr;UserUpperStr"
|
|
lDataItemDict = inGSettings["AgentDict"][lAgentItemKeyStrItem]
|
|
lAgentDict[lKeyStr]=lDataItemDict
|
|
lHandlebarsDataItemDict = copy.deepcopy(lDataItemDict)
|
|
lHandlebarsDataItemDict["HostnameUpperStr"]=lAgentItemKeyStrItem[0]
|
|
lHandlebarsDataItemDict["UserUpperStr"]=lAgentItemKeyStrItem[1]
|
|
lAgentDict["HandlebarsList"].append(lHandlebarsDataItemDict)
|
|
return lAgentDict
|
|
|
|
|
|
#v1.2.0 Send data container to the client from the server
|
|
# /pyOpenRPA/ServerData return {"HashStr" , "ServerDataDict": {"CPKeyStr":{"HTMLStr":"", DataDict:{}}}}
|
|
|
|
# Client: mGlobal.pyOpenRPA.ServerDataHashStr
|
|
# Client: mGlobal.pyOpenRPA.ServerDataDict
|
|
def pyOpenRPA_ServerData(inRequest,inGSettings):
|
|
|
|
# Extract the hash value from request
|
|
lValueStr = None
|
|
if inRequest.headers.get('Content-Length') is not None:
|
|
lInputByteArrayLength = int(inRequest.headers.get('Content-Length'))
|
|
lInputByteArray = inRequest.rfile.read(lInputByteArrayLength)
|
|
# Превращение массива байт в объект
|
|
lValueStr = (lInputByteArray.decode('utf8'))
|
|
# Generate ServerDataDict
|
|
lFlagDoGenerateBool = True
|
|
while lFlagDoGenerateBool:
|
|
lServerDataDict = {
|
|
"CPDict": HiddenCPDictGenerate(inRequest=inRequest, inGSettings=inGSettings),
|
|
"RDPDict": HiddenRDPDictGenerate(inRequest=inRequest, inGSettings=inGSettings),
|
|
"AgentDict": HiddenAgentDictGenerate(inRequest=inRequest, inGSettings=inGSettings),
|
|
"UserDict": {"UACClientDict": inRequest.OpenRPA["DefUserRoleHierarchyGet"]()},
|
|
}
|
|
# Create JSON
|
|
lServerDataDictJSONStr = json.dumps(lServerDataDict)
|
|
# Generate hash
|
|
lServerDataHashStr = str(hash(lServerDataDictJSONStr))
|
|
if lValueStr!=lServerDataHashStr and lServerDataHashStr!= "" and lServerDataHashStr!= None: # Case if Hash is not equal
|
|
lFlagDoGenerateBool = False
|
|
else: # Case Hashes are equal
|
|
time.sleep(inGSettings["Client"]["Session"]["ControlPanelRefreshIntervalSecFloat"])
|
|
# Return the result if Hash is changed
|
|
lResult = {"HashStr": lServerDataHashStr, "ServerDataDict": lServerDataDict}
|
|
inResponseDict = inRequest.OpenRPAResponseDict
|
|
# Send message back to client
|
|
message = json.dumps(lResult)
|
|
# Write content as utf-8 data
|
|
inResponseDict["Body"] = bytes(message, "utf8")
|
|
return lResult
|
|
|
|
#v1.2.0 Send data container to the client from the server
|
|
# /pyOpenRPA/ServerLog return {"HashStr" , "ServerLogList": ["row 1", "row 2"]}
|
|
# Client: mGlobal.pyOpenRPA.ServerLogListHashStr
|
|
# Client: mGlobal.pyOpenRPA.ServerLogList
|
|
def pyOpenRPA_ServerLog(inRequest,inGSDict):
|
|
# Extract the hash value from request
|
|
lValueStr = None
|
|
if inRequest.headers.get('Content-Length') is not None:
|
|
lInputByteArrayLength = int(inRequest.headers.get('Content-Length'))
|
|
lInputByteArray = inRequest.rfile.read(lInputByteArrayLength)
|
|
# Превращение массива байт в объект
|
|
lValueStr = (lInputByteArray.decode('utf8'))
|
|
# Generate ServerDataDict
|
|
lFlagDoGenerateBool = True
|
|
while lFlagDoGenerateBool:
|
|
lServerLogList = inGSDict["Client"]["DumpLogList"]
|
|
# Get hash
|
|
lServerLogListHashStr = inGSDict["Client"]["DumpLogListHashStr"]
|
|
if lValueStr!=lServerLogListHashStr and lServerLogListHashStr!= "" and lServerLogListHashStr!= None: # Case if Hash is not equal Fix because None can be obtained without JSON decode
|
|
lFlagDoGenerateBool = False
|
|
else: # Case Hashes are equal
|
|
time.sleep(inGSDict["Client"]["DumpLogListRefreshIntervalSecFloat"])
|
|
# Return the result if Hash is changed
|
|
lResult = {"HashStr": lServerLogListHashStr, "ServerLogList": lServerLogList}
|
|
inResponseDict = inRequest.OpenRPAResponseDict
|
|
# Send message back to client
|
|
message = json.dumps(lResult)
|
|
# Write content as utf-8 data
|
|
inResponseDict["Body"] = bytes(message, "utf8")
|
|
return lResult
|
|
|
|
def pyOpenRPA_Screenshot(inRequest,inGlobalDict):
|
|
# Get Screenshot
|
|
def SaveScreenshot(inFilePath):
|
|
# grab fullscreen
|
|
# Save the entire virtual screen as a PNG
|
|
lScreenshot = getScreenAsImage()
|
|
lScreenshot.save('screenshot.png', format='png')
|
|
# lScreenshot = ScreenshotSecondScreen.grab_screen()
|
|
# save image file
|
|
# lScreenshot.save('screenshot.png')
|
|
# Сохранить файл на диск
|
|
SaveScreenshot("Screenshot.png")
|
|
lFileObject = open("Screenshot.png", "rb")
|
|
# Write content as utf-8 data
|
|
inRequest.OpenRPAResponseDict["Body"] = lFileObject.read()
|
|
# Закрыть файловый объект
|
|
lFileObject.close()
|
|
# Add activity item or activity list to the processor queue
|
|
# Body is Activity item or Activity List
|
|
def pyOpenRPA_Processor(inRequest, inGSettings):
|
|
# Recieve the data
|
|
lValueStr = None
|
|
if inRequest.headers.get('Content-Length') is not None:
|
|
lInputByteArrayLength = int(inRequest.headers.get('Content-Length'))
|
|
lInputByteArray = inRequest.rfile.read(lInputByteArrayLength)
|
|
# Превращение массива байт в объект
|
|
lInput = json.loads(lInputByteArray.decode('utf8'))
|
|
# If list - operator plus
|
|
if type(lInput) is list:
|
|
inGSettings["ProcessorDict"]["ActivityList"]+=lInput
|
|
else:
|
|
inGSettings["ProcessorDict"]["ActivityList"].append(lInput)
|
|
# Execute activity list
|
|
def pyOpenRPA_ActivityListExecute(inRequest, inGSettings):
|
|
# Recieve the data
|
|
lValueStr = None
|
|
if inRequest.headers.get('Content-Length') is not None:
|
|
lInputByteArrayLength = int(inRequest.headers.get('Content-Length'))
|
|
lInputByteArray = inRequest.rfile.read(lInputByteArrayLength)
|
|
# Превращение массива байт в объект
|
|
lInput = json.loads(lInputByteArray.decode('utf8'))
|
|
# If list - operator plus
|
|
if type(lInput) is list:
|
|
lResultList = Processor.ActivityListExecute(inGSettings = inGSettings, inActivityList = lInput)
|
|
inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lResultList), "utf8")
|
|
else:
|
|
lResultList = Processor.ActivityListExecute(inGSettings = inGSettings, inActivityList = [lInput])
|
|
inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lResultList[0]), "utf8")
|
|
|
|
# See docs in Agent (pyOpenRPA.Agent.O2A)
|
|
def pyOpenRPA_Agent_O2A(inRequest, inGSettings):
|
|
# Test solution
|
|
while True:
|
|
time.sleep(3)
|
|
|
|
# See docs in Agent (pyOpenRPA.Agent.A2O)
|
|
def pyOpenRPA_Agent_A2O(inRequest, inGSettings):
|
|
# Recieve the data
|
|
lValueStr = None
|
|
if inRequest.headers.get('Content-Length') is not None:
|
|
lInputByteArrayLength = int(inRequest.headers.get('Content-Length'))
|
|
lInputByteArray = inRequest.rfile.read(lInputByteArrayLength)
|
|
print(lInputByteArray)
|
|
# Превращение массива байт в объект
|
|
lInput = json.loads(lInputByteArray.decode('utf8'))
|
|
if "LogList" in lInput:
|
|
for lLogItemStr in lInput["LogList"]:
|
|
inGSettings["Logger"].info(lLogItemStr)
|
|
|
|
def SettingsUpdate(inGlobalConfiguration):
|
|
import os
|
|
import pyOpenRPA.Orchestrator
|
|
lOrchestratorFolder = "\\".join(pyOpenRPA.Orchestrator.__file__.split("\\")[:-1])
|
|
lURLList = \
|
|
[ #List of available URLs with the orchestrator server
|
|
#{
|
|
# "Method":"GET|POST",
|
|
# "URL": "/index", #URL of the request
|
|
# "MatchType": "", #"BeginWith|Contains|Equal|EqualCase",
|
|
# "ResponseFilePath": "", #Absolute or relative path
|
|
# "ResponseFolderPath": "", #Absolute or relative path
|
|
# "ResponseContentType": "", #HTTP Content-type
|
|
# "ResponseDefRequestGlobal": None #Function with str result
|
|
#}
|
|
#Orchestrator basic dependencies
|
|
{"Method":"GET", "URL": "/", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "Web\\Index.xhtml"), "ResponseContentType": "text/html"},
|
|
{"Method":"GET", "URL": "/Index.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "Web\\Index.js"), "ResponseContentType": "text/javascript"},
|
|
{"Method":"GET", "URL": "/3rdParty/Semantic-UI-CSS-master/semantic.min.css", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Semantic-UI-CSS-master\\semantic.min.css"), "ResponseContentType": "text/css"},
|
|
{"Method":"GET", "URL": "/3rdParty/Semantic-UI-CSS-master/semantic.min.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Semantic-UI-CSS-master\\semantic.min.js"), "ResponseContentType": "application/javascript"},
|
|
{"Method":"GET", "URL": "/3rdParty/jQuery/jquery-3.1.1.min.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\jQuery\\jquery-3.1.1.min.js"), "ResponseContentType": "application/javascript"},
|
|
{"Method":"GET", "URL": "/3rdParty/Google/LatoItalic.css", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Google\\LatoItalic.css"), "ResponseContentType": "font/css"},
|
|
{"Method":"GET", "URL": "/3rdParty/Semantic-UI-CSS-master/themes/default/assets/fonts/icons.woff2", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Semantic-UI-CSS-master\\themes\\default\\assets\\fonts\\icons.woff2"), "ResponseContentType": "font/woff2"},
|
|
{"Method":"GET", "URL": "/favicon.ico", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "Web\\favicon.ico"), "ResponseContentType": "image/x-icon"},
|
|
{"Method":"GET", "URL": "/3rdParty/Handlebars/handlebars-v4.1.2.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Handlebars\\handlebars-v4.1.2.js"), "ResponseContentType": "application/javascript"},
|
|
{"Method": "GET", "URL": "/Monitor/ControlPanelDictGet", "MatchType": "Equal", "ResponseDefRequestGlobal": BackwardCompatibility.v1_2_0_Monitor_ControlPanelDictGet_SessionCheckInit, "ResponseContentType": "application/json"},
|
|
{"Method": "GET", "URL": "/GetScreenshot", "MatchType": "BeginWith", "ResponseDefRequestGlobal": pyOpenRPA_Screenshot, "ResponseContentType": "image/png"},
|
|
{"Method": "GET", "URL": "/pyOpenRPA_logo.png", "MatchType": "Equal", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\pyOpenRPA_logo.png"), "ResponseContentType": "image/png"},
|
|
{"Method": "POST", "URL": "/Orchestrator/UserRoleHierarchyGet", "MatchType": "Equal","ResponseDefRequestGlobal": BackwardCompatibility.v1_2_0_UserRoleHierarchyGet, "ResponseContentType": "application/json"},
|
|
# New way of the v.1.2.0 functionallity (all defs by the URL from /pyOpenRPA/...)
|
|
{"Method": "POST", "URL": "/pyOpenRPA/ServerData", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerData, "ResponseContentType": "application/json"},
|
|
{"Method": "POST", "URL": "/pyOpenRPA/ServerLog", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerLog, "ResponseContentType": "application/json"},
|
|
{"Method": "GET", "URL": "/pyOpenRPA/Screenshot", "MatchType": "BeginWith", "ResponseDefRequestGlobal": pyOpenRPA_Screenshot, "ResponseContentType": "image/png"},
|
|
{"Method": "POST", "URL": "/pyOpenRPA/Processor", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_Processor, "ResponseContentType": "application/json"},
|
|
{"Method": "POST", "URL": "/pyOpenRPA/ActivityListExecute", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ActivityListExecute, "ResponseContentType": "application/json"},
|
|
{"Method": "POST", "URL": "/pyOpenRPA/Agent/O2A", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_Agent_O2A, "ResponseContentType": "application/json"},
|
|
{"Method": "POST", "URL": "/pyOpenRPA/Agent/A2O", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_Agent_A2O, "ResponseContentType": "application/json"},
|
|
]
|
|
inGlobalConfiguration["Server"]["URLList"]=inGlobalConfiguration["Server"]["URLList"]+lURLList
|
|
return inGlobalConfiguration |