import json from inspect import signature # For detect count of def args #ControlPanelDict from desktopmagic.screengrab_win32 import ( getDisplayRects, saveScreenToBmp, saveRectToBmp, getScreenAsImage, getRectAsImage, getDisplaysAsImages) from http import cookies import uuid # generate UUID4 import time # sleep functions import datetime # datetime functions import threading # Multi-threading #v1.2.0 Send data container to the client from the server # /pyOpenRPA/ServerData return {"HashStr" , "ServerDataDict"} # Client: mGlobal.pyOpenRPA.ServerDataHashStr # Client: mGlobal.pyOpenRPA.ServerDataDict def pyOpenRPA_ServerData(inRequest,inGSettings): lResult = {"HashStr": "", "ServerDataDict": {}} return lResult # /Orchestrator/RobotRDPActive/ControlPanelDictGet def RobotRDPActive_ControlPanelDictGet(inRequest,inGlobalDict): inResponseDict = inRequest.OpenRPAResponseDict lResultDict = { "DataList":[ # {"SessionKeyStr":"", "SessionHexStr": "", "IsFullScreenBool": False, "IsIgnoredBool": False} ] } # Iterate throught the RDP List for lRDPSessionKeyStrItem in inGlobalDict["RobotRDPActive"]["RDPList"]: lRDPConfiguration = inGlobalDict["RobotRDPActive"]["RDPList"][lRDPSessionKeyStrItem] # Get the configuration dict lDataItemDict = {"SessionKeyStr":"", "SessionHexStr": "", "IsFullScreenBool": False, "IsIgnoredBool": False} # Template lDataItemDict["SessionKeyStr"] = lRDPSessionKeyStrItem # Session key str lDataItemDict["SessionHexStr"] = lRDPConfiguration["SessionHex"] # Session Hex lDataItemDict["IsFullScreenBool"] = True if lRDPSessionKeyStrItem == inGlobalDict["RobotRDPActive"]["FullScreenRDPSessionKeyStr"] else False # Check the full screen for rdp window lDataItemDict["IsIgnoredBool"] = lRDPConfiguration["SessionIsIgnoredBool"] # Is ignored lResultDict["DataList"].append(lDataItemDict) # Send message back to client message = json.dumps(lResultDict) # Write content as utf-8 data inResponseDict["Body"] = bytes(message, "utf8") # def to check control panels for selected session def Monitor_ControlPanelDictGet_SessionCheckInit(inRequest,inGlobalDict): lL = inGlobalDict["Logger"] # Alias for logger lLifetimeSecFloat = inGlobalDict["Client"]["Session"]["LifetimeSecFloat"] lLifetimeRequestSecFloat = inGlobalDict["Client"]["Session"]["LifetimeRequestSecFloat"] lControlPanelRefreshIntervalSecFloat = inGlobalDict["Client"]["Session"]["ControlPanelRefreshIntervalSecFloat"] lCookieSessionGUIDStr = None # generate the new GUID # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Technicaldef - interval check control panels + check actuality of the session by the datetime def TechnicalCheck(): lItemValue = inGlobalDict["Client"]["Session"]["TechnicalSessionGUIDCache"][lCookieSessionGUIDStr] # Lifetime is ok - check control panel lDatasetCurrentBytes = Monitor_ControlPanelDictGet(inRequest,inGlobalDict) # Call the control panel if lDatasetCurrentBytes != lItemValue["DatasetLast"]["ControlPanel"]["Data"]: # Check if dataset is changed lItemValue["DatasetLast"]["ControlPanel"]["Data"] = lDatasetCurrentBytes # Set new datset lItemValue["DatasetLast"]["ControlPanel"]["ReturnBool"] = True # Set flag to return the data # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Technicaldef - Create new session struct def TechnicalSessionNew(inSessionGUIDStr): lCookieSessionGUIDStr = inSessionGUIDStr # Generate the new GUID lSessionNew = { # Session with some GUID str. On client session guid stored in cookie "SessionGUIDStr" "InitDatetime": datetime.datetime.now(), # Datetime when session GUID was created "DatasetLast": { "ControlPanel": { "Data": None, # Struct to check with new iterations. None if starts "ReturnBool": False # flag to return, close request and return data as json } }, "ClientRequestHandler": inRequest, # Last client request handler "UserADStr": inRequest.OpenRPA["User"], # User, who connect. None if user is not exists "DomainADStr": inRequest.OpenRPA["Domain"], # Domain of the user who connect. None if user is not exists } inGlobalDict["Client"]["Session"]["TechnicalSessionGUIDCache"][lCookieSessionGUIDStr] = lSessionNew # Set new session in dict inRequest.OpenRPAResponseDict["SetCookies"]["SessionGUIDStr"] = lCookieSessionGUIDStr # Set SessionGUIDStr in cookies if lL: lL.info(f"New session GUID is created. GUID {lCookieSessionGUIDStr}") return lCookieSessionGUIDStr # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # lCreateNewSessionBool = False # Flag to create new session structure # step 1 - get cookie SessionGUIDStr lSessionGUIDStr = inRequest.headers.get("SessionGUIDStr", None) if lSessionGUIDStr is not None: # Check if GUID session is ok #inRequest.OpenRPAResponseDict["StatusCode"] = 301 #inRequest.OpenRPAResponseDict["Headers"]["Location"] = "/" #if lL: lL.info(f"GUID is detected - send HTTP 301 to refresh page") lCookieSessionGUIDStr = lSessionGUIDStr # Get the existing GUID if lSessionGUIDStr not in inGlobalDict["Client"]["Session"]["TechnicalSessionGUIDCache"]: lCookieSessionGUIDStr= TechnicalSessionNew(inSessionGUIDStr = lSessionGUIDStr) # Create new session else: # Update the datetime of the request session inGlobalDict["Client"]["Session"]["TechnicalSessionGUIDCache"][lCookieSessionGUIDStr]["InitDatetime"]=datetime.datetime.now() else: lCookieSessionGUIDStr = TechnicalSessionNew(inSessionGUIDStr = lSessionGUIDStr) # Create new session # Init the RobotRDPActive in another thread #lThreadCheckCPInterval = threading.Thread(target=TechnicalIntervalCheck) #lThreadCheckCPInterval.daemon = True # Run the thread in daemon mode. #lThreadCheckCPInterval.start() # Start the thread execution. # Step 2 - interval check if data is exist lTimeStartSecFloat = time.time() lDoWhileBool = True # Flag to iterate throught the lifetime of the request while lDoWhileBool: #print(lTechnicalSessionGUIDCache) #print(lCookieSessionGUIDStr) if lCookieSessionGUIDStr in inGlobalDict["Client"]["Session"]["TechnicalSessionGUIDCache"]: lItemValue = inGlobalDict["Client"]["Session"]["TechnicalSessionGUIDCache"][lCookieSessionGUIDStr] if (time.time() - lTimeStartSecFloat) >= lLifetimeRequestSecFloat: # Check if lifetime client request is over or has no key if lL: lL.debug(f"Client request lifetime is over") lDoWhileBool = False # Stop the iterations if lDoWhileBool: TechnicalCheck() # Calculate the CP if lItemValue["DatasetLast"]["ControlPanel"]["ReturnBool"] == True: # Return data if data flag it True lDatasetCurrentBytes = lItemValue["DatasetLast"]["ControlPanel"]["Data"] # Set new dataset inResponseDict = inRequest.OpenRPAResponseDict inResponseDict["Body"] = lDatasetCurrentBytes lItemValue["DatasetLast"]["ControlPanel"]["ReturnBool"] = False # Set flag that data was returned lDoWhileBool = False # Stop the iterations else: lCookieSessionGUIDStr = TechnicalSessionNew(inSessionGUIDStr = lCookieSessionGUIDStr) # Create new session if lDoWhileBool: # Sleep if we wait hte next iteration time.sleep(lControlPanelRefreshIntervalSecFloat) # Sleep to the next iteration def Monitor_ControlPanelDictGet(inRequest,inGlobalDict): inResponseDict = inRequest.OpenRPAResponseDict lL = inGlobalDict["Logger"] # Alias for logger # Create result JSON lResultJSON = {"RenderRobotList": [], "RenderRDPList": []} lRenderFunctionsRobotList = inGlobalDict["ControlPanelDict"]["RobotList"] for lItem in lRenderFunctionsRobotList: lUACBool = True # Check if render function is applicable User Access Rights (UAC) if inGlobalDict["Server"]["AccessUsers"]["FlagCredentialsAsk"] is True: lUserRights = inGlobalDict["Server"]["AccessUsers"]["RuleDomainUserDict"][(inRequest.OpenRPA["Domain"].upper(),inRequest.OpenRPA["User"].upper())] if len(lUserRights["ControlPanelKeyAllowedList"]) > 0 and lItem["KeyStr"] not in lUserRights["ControlPanelKeyAllowedList"]: lUACBool = False # UAC Check is not passed - False for user if lUACBool: # Run function if UAC is TRUE # Выполнить вызов и записать результат # Call def (inRequest, inGSettings) or def (inGSettings) lItemResultDict = None lDEFSignature = signature(lItem["RenderFunction"]) # Get signature of the def lDEFARGLen = len(lDEFSignature.parameters.keys()) # get count of the def args try: if lDEFARGLen == 1: # def (inGSettings) lItemResultDict = lItem["RenderFunction"](inGlobalDict) elif lDEFARGLen == 2: # def (inRequest, inGSettings) lItemResultDict = lItem["RenderFunction"](inRequest, inGlobalDict) elif lDEFARGLen == 0: # def () lItemResultDict = lItem["RenderFunction"]() # RunFunction lResultJSON["RenderRobotList"].append(lItemResultDict) except Exception as e: if lL: lL.exception(f"Error in control panel. CP item {lItem}. Exception is below") # Iterate throught the RDP list for lRDPSessionKeyStrItem in inGlobalDict["RobotRDPActive"]["RDPList"]: lRDPConfiguration = inGlobalDict["RobotRDPActive"]["RDPList"][ lRDPSessionKeyStrItem] # Get the configuration dict lDataItemDict = {"SessionKeyStr": "", "SessionHexStr": "", "IsFullScreenBool": False, "IsIgnoredBool": False} # Template lDataItemDict["SessionKeyStr"] = lRDPSessionKeyStrItem # Session key str lDataItemDict["SessionHexStr"] = lRDPConfiguration["SessionHex"] # Session Hex lDataItemDict["IsFullScreenBool"] = True if lRDPSessionKeyStrItem == inGlobalDict["RobotRDPActive"][ "FullScreenRDPSessionKeyStr"] else False # Check the full screen for rdp window lDataItemDict["IsIgnoredBool"] = lRDPConfiguration["SessionIsIgnoredBool"] # Is ignored lResultJSON["RenderRDPList"].append(lDataItemDict) # Send message back to client message = json.dumps(lResultJSON) # Write content as utf-8 data #inResponseDict["Body"] = bytes(message, "utf8") return bytes(message, "utf8") # UserAccess get rights hierarchy dict in json def UserRoleHierarchyGet(inRequest,inGlobalDict): inResponseDict = inRequest.OpenRPAResponseDict # Create result JSON lResultDict = inRequest.OpenRPA["DefUserRoleHierarchyGet"]() # Get the User Role Hierarchy list # Send message back to client message = json.dumps(lResultDict) # Write content as utf-8 data inResponseDict["Body"] = bytes(message, "utf8") def GetScreenshot(inRequest,inGlobalDict): # Get Screenshot def SaveScreenshot(inFilePath): # grab fullscreen # Save the entire virtual screen as a PNG lScreenshot = getScreenAsImage() lScreenshot.save('screenshot.png', format='png') # lScreenshot = ScreenshotSecondScreen.grab_screen() # save image file # lScreenshot.save('screenshot.png') # Сохранить файл на диск SaveScreenshot("Screenshot.png") lFileObject = open("Screenshot.png", "rb") # Write content as utf-8 data inRequest.OpenRPAResponseDict["Body"] = lFileObject.read() # Закрыть файловый объект lFileObject.close() def SettingsUpdate(inGlobalConfiguration): import os import pyOpenRPA.Orchestrator lOrchestratorFolder = "\\".join(pyOpenRPA.Orchestrator.__file__.split("\\")[:-1]) lURLList = \ [ #List of available URLs with the orchestrator server #{ # "Method":"GET|POST", # "URL": "/index", #URL of the request # "MatchType": "", #"BeginWith|Contains|Equal|EqualCase", # "ResponseFilePath": "", #Absolute or relative path # "ResponseFolderPath": "", #Absolute or relative path # "ResponseContentType": "", #HTTP Content-type # "ResponseDefRequestGlobal": None #Function with str result #} #Orchestrator basic dependencies {"Method":"GET", "URL": "/", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "Web\\Index.xhtml"), "ResponseContentType": "text/html"}, {"Method":"GET", "URL": "/Index.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "Web\\Index.js"), "ResponseContentType": "text/javascript"}, {"Method":"GET", "URL": "/3rdParty/Semantic-UI-CSS-master/semantic.min.css", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Semantic-UI-CSS-master\\semantic.min.css"), "ResponseContentType": "text/css"}, {"Method":"GET", "URL": "/3rdParty/Semantic-UI-CSS-master/semantic.min.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Semantic-UI-CSS-master\\semantic.min.js"), "ResponseContentType": "application/javascript"}, {"Method":"GET", "URL": "/3rdParty/jQuery/jquery-3.1.1.min.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\jQuery\\jquery-3.1.1.min.js"), "ResponseContentType": "application/javascript"}, {"Method":"GET", "URL": "/3rdParty/Google/LatoItalic.css", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Google\\LatoItalic.css"), "ResponseContentType": "font/css"}, {"Method":"GET", "URL": "/3rdParty/Semantic-UI-CSS-master/themes/default/assets/fonts/icons.woff2", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Semantic-UI-CSS-master\\themes\\default\\assets\\fonts\\icons.woff2"), "ResponseContentType": "font/woff2"}, {"Method":"GET", "URL": "/favicon.ico", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "Web\\favicon.ico"), "ResponseContentType": "image/x-icon"}, {"Method":"GET", "URL": "/3rdParty/Handlebars/handlebars-v4.1.2.js", "MatchType": "EqualCase", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\Handlebars\\handlebars-v4.1.2.js"), "ResponseContentType": "application/javascript"}, {"Method": "GET", "URL": "/Monitor/ControlPanelDictGet", "MatchType": "Equal", "ResponseDefRequestGlobal": Monitor_ControlPanelDictGet_SessionCheckInit, "ResponseContentType": "application/json"}, {"Method": "GET", "URL": "/GetScreenshot", "MatchType": "BeginWith", "ResponseDefRequestGlobal": GetScreenshot, "ResponseContentType": "image/png"}, {"Method": "GET", "URL": "/pyOpenRPA_logo.png", "MatchType": "Equal", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\pyOpenRPA_logo.png"), "ResponseContentType": "image/png"}, {"Method": "POST", "URL": "/Orchestrator/UserRoleHierarchyGet", "MatchType": "Equal","ResponseDefRequestGlobal": UserRoleHierarchyGet, "ResponseContentType": "application/json"}, {"Method": "POST", "URL": "/pyOpenRPA/ServerData", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerData, "ResponseContentType": "application/json"} ] inGlobalConfiguration["Server"]["URLList"]=inGlobalConfiguration["Server"]["URLList"]+lURLList return inGlobalConfiguration