You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ORPA-pyOpenRPA/Sources/pyOpenRPA/Tools/RobotRDPActive/Connector.py

199 lines
9.7 KiB

#Import parent folder to import current / other packages
from pyOpenRPA.Robot import UIDesktop #Lib to access RDP window
import os #os for process run
import uuid #temp id for Template.rdp
import tempfile #Temporary location
import time
import subprocess
from . import Clipboard # Clipboard functions get/set
import keyboard
import time
import random # random integers
from win32api import GetSystemMetrics # Get Screen rect
#Connect to RDP session
"""
{
"Host": "", #Host address
"Port": "", #RDP Port
"Login": "", # Login
"Password": "", #Password
"Screen": {
"Resolution":"FullScreen", #"640x480" or "1680x1050" or "FullScreen". If Resolution not exists set full screen
"FlagUseAllMonitors": False, # True or False
"DepthBit":"" #"32" or "24" or "16" or "15"
}
}
"""
def Session(inRDPSessionConfiguration):
#RDPConnector.SessionConnect(mConfiguration)
#RDPConnector.LoginPassSet("111.222.222.111","ww","dd")
(lRDPFile, lSessionHex) = SessionConfigurationCreate(inRDPSessionConfiguration)
#Set session hex in globalDict
inRDPSessionConfiguration["SessionHex"] = lSessionHex
#Set login/password
SessionLoginPasswordSet(inRDPSessionConfiguration["Host"],inRDPSessionConfiguration["Login"],inRDPSessionConfiguration["Password"])
#Start session
SessionRDPStart(lRDPFile)
#Remove temp file
time.sleep(4) #Delete file after some delay - one way to delete and run the RDP before because RDP is not read file in one moment
os.remove(lRDPFile) # delete the temp rdp
return inRDPSessionConfiguration
#Add login/ password to the windows credentials to run RDP
def SessionLoginPasswordSet(inHost, inLogin, inPassword):
#Clear old login/password if it exists
#os.system(f"cmdkey /delete:TERMSRV/{inHost}") #Dont need to delete because new user password will clear the previous creds
#Set login password for host
os.system(f'cmdkey /generic:TERMSRV/{inHost} /user:{inLogin} /pass:"{inPassword}"')
return None
#Create current .rdp file with settings
#Return (full path to file, session hex)
def SessionConfigurationCreate(inConfiguration):
#RobotRDPActive folder path
lFileFullPath=__file__
lFileFullPath = lFileFullPath.replace("/","\\")
lRobotRDPActiveFolderPath = "\\".join(lFileFullPath.split("\\")[:-1])
#Full path to Template.rdp file
lRDPTemplateFileFullPath = os.path.join(lRobotRDPActiveFolderPath, "Template.rdp")
#Open template file (.rdp encoding is USC-2 LE BOM = UTF-16 LE) http://qaru.site/questions/7156020/python-writing-a-ucs-2-little-endian-utf-16-le-file-with-bom
lRDPTemplateFileContent = open(lRDPTemplateFileFullPath, "r", encoding="utf-16-le").read()
#Prepare host:port
lHostPort=inConfiguration['Host']
if 'Port' in inConfiguration:
if inConfiguration['Port']:
lHostPort=f"{lHostPort}:{inConfiguration['Port']}"
# Generate parameter for .rdp "drivestoredirect:s:C:\;"
lDriveStoreDirectStr = ""
for lItem in inConfiguration['SharedDriveList']:
lDriveStoreDirectStr+=f"{lItem.upper()}:\\;" # Attention - all drives must be only in upper case!!!
#Replace {Width}, {Height}, {BitDepth}, {HostPort}, {Login}
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{Width}", str(inConfiguration.get('Screen',{}).get("Width",1680)))
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{Height}", str(inConfiguration.get('Screen',{}).get("Height",1050)))
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{BitDepth}", inConfiguration.get('Screen',{}).get("DepthBit","32"))
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{HostPort}", lHostPort)
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{Login}", inConfiguration['Login'])
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{SharedDriveList}", lDriveStoreDirectStr)
#Save template to temp file
lRDPCurrentFileFullPath = os.path.join(tempfile.gettempdir(), f"{uuid.uuid4().hex}.rdp")
open(lRDPCurrentFileFullPath, "w", encoding="utf-16-le").write(lRDPTemplateFileContent)
#Return .rdp full path
return (lRDPCurrentFileFullPath, (lRDPCurrentFileFullPath.split("\\")[-1])[0:-4])
#RDPSessionStart
def SessionRDPStart(inRDPFilePath):
#run rdp session
lItemArgs = [inRDPFilePath]
subprocess.Popen(lItemArgs, shell=True)
#Wait for UAC unknown publisher exists
lRDPFileName = (inRDPFilePath.split("\\")[-1])[0:-4]
lWaitResult = UIDesktop.UIOSelectorsSecs_WaitAppear_List(
[
[{"title": "Подключение к удаленному рабочему столу", "class_name": "#32770", "backend": "win32"},
{"title": "Боль&ше не выводить запрос о подключениях к этому компьютеру", "friendly_class_name": "CheckBox"}],
[{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "D&on't ask me again for connections to this computer",
"friendly_class_name": "CheckBox"}],
[{"title_re": f"{lRDPFileName}.*",
"class_name": "TscShellContainerClass", "backend": "win32"}]
],
5 years ago
30
)
#Click if 0 is appear (RUS)
if 0 in lWaitResult:
#Check the box do not retry
UIDesktop.UIOSelector_Get_UIO([{"title": "Подключение к удаленному рабочему столу", "backend": "win32"},
{"title": "Боль&ше не выводить запрос о подключениях к этому компьютеру", "friendly_class_name": "CheckBox"}]).check()
#Go to connection
UIDesktop.UIOSelector_Get_UIO([{"title": "Подключение к удаленному рабочему столу", "backend": "win32"},
{"title":"Подкл&ючить", "class_name":"Button"}]).click()
lWaitResult = UIDesktop.UIOSelectorsSecs_WaitAppear_List(
[
[{"title_re": f"{lRDPFileName}.*",
"class_name": "TscShellContainerClass", "backend": "win32"}]
],
5 years ago
30
)
# Click if 1 is appear (ENG)
if 1 in lWaitResult:
# Check the box do not retry
UIDesktop.UIOSelector_Get_UIO([{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "D&on't ask me again for connections to this computer",
"friendly_class_name": "CheckBox"}]).check()
# Go to connection
UIDesktop.UIOSelector_Get_UIO([{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "Co&nnect", "class_name": "Button"}]).click()
lWaitResult = UIDesktop.UIOSelectorsSecs_WaitAppear_List(
[
[{"title_re": f"{lRDPFileName}.*",
"class_name": "TscShellContainerClass", "backend": "win32"}]
],
5 years ago
30
)
# Raise exception if RDP is not active
if len(lWaitResult) == 0:
raise Exception("Error when initialize the RDP session!")
#Prepare little window
SessionScreen100x550(lRDPFileName)
return None
#Set fullscreen for app
def SessionScreenFull(inSessionHex):
#Prepare little window
lRDPWindow = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHex}.*", "backend": "win32"}])
lRDPWindow.set_focus()
lRDPWindow.maximize()
if not SessionIsFullScreen(inSessionHex):
lRDPWindow.type_keys("^%{BREAK}")
time.sleep(0.5)
return None
#Set Little window of the session
def SessionScreen100x550(inSessionHex):
#Prepare little window
lRDPWindow = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHex}.*", "backend": "win32"}])
lRDPWindow.set_focus()
if SessionIsFullScreen(inSessionHex):
lRDPWindow.type_keys("^%{BREAK}")
time.sleep(0.5)
lRDPWindow.restore()
time.sleep(0.5)
lRDPWindow.move_window(10,10,550,100)
return None
#Type command in CMD
# inFlagDoCrossCheck: True - Do check that CMD is executed (the text response will not be available)
# inModeStr "LISTEN", "CROSSCHECK", "RUN"
# "LISTEN" - Get result of the cmd command in result TODO get home script
# "CROSSCHECK" - Check if the command was successufully sent TODO get home script
# "RUN" - Run without crosscheck and get clipboard
# return ...
# example Connector.SessionCMDRun("4d1e48f3ff6c45cc810ea25d8adbeb50","start notepad", "RUN")
def SessionCMDRun(inSessionHex,inCMDCommandStr, inModeStr="RUN"):
lCMDPostFixStr = "" # Case default "RUN"
if inModeStr == "CROSSCHECK":
lCMDPostFixStr = f"| echo {str(random.randrange(999,9999999))} | clip"
elif inModeStr == "LISTEN":
lCMDPostFixStr = f"| clip"
SessionScreenFull(inSessionHex)
time.sleep(2)
UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHex}.*", "backend": "win32"}])
keyboard.press_and_release('win+r')
time.sleep(1)
keyboard.write(f"cmd /c {inCMDCommandStr} {lCMDPostFixStr}")
time.sleep(1)
# TODo cross check from clipboard
keyboard.press_and_release('enter')
SessionScreen100x550(inSessionHex)
# Check if session is in Full screen mode
# Return True - is in fullscreen
# example print(Connector.SessionIsFullScreen(""))
def SessionIsFullScreen(inSessionHexStr):
#Default resul
lResult = False
lWeight = GetSystemMetrics(0)
lHeight = GetSystemMetrics(1)
#Get window screen
lRectangle = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHexStr}.*", "backend": "win32"}]).rectangle()
# Get Height/Weight
lSessionWeight = lRectangle.right - lRectangle.left
lSessionHeight = lRectangle.bottom - lRectangle.top
#Case fullscreen
if lSessionHeight == lHeight and lSessionWeight == lWeight:
lResult = True
return lResult