You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ORPA-pyOpenRPA/Sources/pyOpenRPA/Orchestrator/RobotRDPActive/Connector.py

441 lines
27 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#Import parent folder to import current / other packages
from pyOpenRPA.Robot import UIDesktop #Lib to access RDP window
from . import ConnectorExceptions # Exceptions classes
import os #os for process run
import uuid #temp id for Template.rdp
import tempfile #Temporary location
import subprocess
from . import Clipboard # Clipboard functions get/set
import keyboard # Keyboard functions
import time
import random # random integers
from win32api import GetSystemMetrics # Get Screen rect
import pyautogui # for hotkey operations
# System variables for recovery scenario
gRecoveryWindowRDPRetryCountInt = 3 # Retry iteration count is RDP window is not responsible
gRecoveryWindowRDPRetryIntervalSecInt = 3 # Retry interval for reconnect
gRecoveryWindowRUNRetryCountInt = 3 # Retry iteration count is RUN window is not responsible
gRecoveryWindowRUNRetryIntervalSecInt = 3 # Retry interval for retry
gRecoveryCMDResponsibleRetryCountInt = 3 # Retry iteration count is CMD is not responsible
gRecoveryCMDResponsibleRetryIntervalSecInt = 3 # Retry interval for retry
gKeyboardHotkeyDelaySecFloat = 0.6 # Const for delay - important for work with RDP!!!!
#Connect to RDP session
"""
{
"Host": "", #Host address
"Port": "", #RDP Port
"Login": "", # Login
"Password": "", #Password
"Screen": {
"Resolution":"FullScreen", #"640x480" or "1680x1050" or "FullScreen". If Resolution not exists set full screen
"FlagUseAllMonitors": False, # True or False
"DepthBit":"" #"32" or "24" or "16" or "15"
}
}
"""
def Session(inRDPSessionConfiguration, inScreenSize550x350Bool = False):
#RDPConnector.SessionConnect(mConfiguration)
#RDPConnector.LoginPassSet("111.222.222.111","ww","dd")
(lRDPFile, lSessionHex) = SessionConfigurationCreate(inRDPSessionConfiguration)
#Set session hex in globalDict
inRDPSessionConfiguration["SessionHex"] = lSessionHex
#Set login/password
SessionLoginPasswordSet(inRDPSessionConfiguration["Host"],inRDPSessionConfiguration["Login"],inRDPSessionConfiguration["Password"])
#Start session
SessionRDPStart(lRDPFile, inScreenSize550x350Bool= inScreenSize550x350Bool)
#Remove temp file
time.sleep(4) #Delete file after some delay - one way to delete and run the RDP before because RDP is not read file in one moment
os.remove(lRDPFile) # delete the temp rdp
# Set the result
return inRDPSessionConfiguration
#Add login/ password to the windows credentials to run RDP
def SessionLoginPasswordSet(inHost, inLogin, inPassword):
#Clear old login/password if it exists
#os.system(f"cmdkey /delete:TERMSRV/{inHost}") #Dont need to delete because new user password will clear the previous creds
#Set login password for host
os.system(f'cmdkey /generic:TERMSRV/{inHost} /user:{inLogin} /pass:"{inPassword}"')
return None
#Create current .rdp file with settings
#Return (full path to file, session hex)
def SessionConfigurationCreate(inConfiguration):
#RobotRDPActive folder path
lFileFullPath=__file__
lFileFullPath = lFileFullPath.replace("/","\\")
lRobotRDPActiveFolderPath = "\\".join(lFileFullPath.split("\\")[:-1])
#Full path to Template.rdp file
lRDPTemplateFileFullPath = os.path.join(lRobotRDPActiveFolderPath, "Template.rdp")
#Open template file (.rdp encoding is USC-2 LE BOM = UTF-16 LE) http://qaru.site/questions/7156020/python-writing-a-ucs-2-little-endian-utf-16-le-file-with-bom
lRDPTemplateFileContent = open(lRDPTemplateFileFullPath, "r", encoding="utf-16-le").read()
#Prepare host:port
lHostPort=inConfiguration['Host']
if 'Port' in inConfiguration:
if inConfiguration['Port']:
lHostPort=f"{lHostPort}:{inConfiguration['Port']}"
# Generate parameter for .rdp "drivestoredirect:s:C:\;"
lDriveStoreDirectStr = ""
for lItem in inConfiguration['SharedDriveList']:
lDriveStoreDirectStr+=f"{lItem.upper()}:\\;" # Attention - all drives must be only in upper case!!!
#Replace {Width}, {Height}, {BitDepth}, {HostPort}, {Login}
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{Width}", str(inConfiguration.get('Screen',{}).get("Width",1680)))
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{Height}", str(inConfiguration.get('Screen',{}).get("Height",1050)))
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{BitDepth}", inConfiguration.get('Screen',{}).get("DepthBit","32"))
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{HostPort}", lHostPort)
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{Login}", inConfiguration['Login'])
lRDPTemplateFileContent = lRDPTemplateFileContent.replace("{SharedDriveList}", lDriveStoreDirectStr)
#Save template to temp file
lRDPCurrentFileFullPath = os.path.join(tempfile.gettempdir(), f"{uuid.uuid4().hex}.rdp")
open(lRDPCurrentFileFullPath, "w", encoding="utf-16-le").write(lRDPTemplateFileContent)
#Return .rdp full path
return (lRDPCurrentFileFullPath, (lRDPCurrentFileFullPath.split("\\")[-1])[0:-4])
#RDPSessionStart
def SessionRDPStart(inRDPFilePath, inScreenSize550x350Bool = False):
#Disable certificate warning
lCMDString = 'reg add "HKEY_CURRENT_USER\\Software\\Microsoft\\Terminal Server Client" /v "AuthenticationLevelOverride" /t "REG_DWORD" /d 0 /f'
os.system(lCMDString)
#run rdp session
lItemArgs = [inRDPFilePath]
subprocess.Popen(lItemArgs, shell=True)
#Wait for UAC unknown publisher exists
lRDPFileName = (inRDPFilePath.split("\\")[-1])[0:-4]
lWaitResult = UIDesktop.UIOSelectorsSecs_WaitAppear_List(
[
[{"title": "Подключение к удаленному рабочему столу", "class_name": "#32770", "backend": "win32"},
{"title": "Боль&ше не выводить запрос о подключениях к этому компьютеру", "friendly_class_name": "CheckBox"}],
[{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "D&on't ask me again for connections to this computer",
"friendly_class_name": "CheckBox"}],
[{"title_re": f"{lRDPFileName}.*",
"class_name": "TscShellContainerClass", "backend": "win32"},{"depth_start":3, "depth_end": 3, "class_name":"UIMainClass"}]
],
30
)
#Enable certificate warning
lCMDString = 'reg add "HKEY_CURRENT_USER\\Software\\Microsoft\\Terminal Server Client" /v "AuthenticationLevelOverride" /t "REG_DWORD" /d 2 /f'
os.system(lCMDString)
#Click if 0 is appear (RUS)
if 0 in lWaitResult:
#Check the box do not retry
UIDesktop.UIOSelector_Get_UIO([{"title": "Подключение к удаленному рабочему столу", "backend": "win32"},
{"title": "Боль&ше не выводить запрос о подключениях к этому компьютеру", "friendly_class_name": "CheckBox"}]).check()
#Go to connection
UIDesktop.UIOSelector_Get_UIO([{"title": "Подключение к удаленному рабочему столу", "backend": "win32"},
{"title":"Подкл&ючить", "class_name":"Button"}]).click()
lWaitResult = UIDesktop.UIOSelectorsSecs_WaitAppear_List(
[
[{"title_re": f"{lRDPFileName}.*",
"class_name": "TscShellContainerClass", "backend": "win32"}]
],
30
)
# Click if 1 is appear (ENG)
if 1 in lWaitResult:
# Check the box do not retry
UIDesktop.UIOSelector_Get_UIO([{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "D&on't ask me again for connections to this computer",
"friendly_class_name": "CheckBox"}]).check()
# Go to connection
UIDesktop.UIOSelector_Get_UIO([{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "Co&nnect", "class_name": "Button"}]).click()
lWaitResult = UIDesktop.UIOSelectorsSecs_WaitAppear_List(
[
[{"title_re": f"{lRDPFileName}.*",
"class_name": "TscShellContainerClass", "backend": "win32"}]
],
30
)
# Raise exception if RDP is not active
if len(lWaitResult) == 0:
raise ConnectorExceptions.SessionWindowNotExistError("Error when initialize the RDP session - No RDP windows has appreared!")
time.sleep(3) # Wait for init
if inScreenSize550x350Bool: SessionScreenSize_X_Y_W_H(inSessionHex = lRDPFileName, inXInt = 10, inYInt = 10, inWInt = 550, inHInt = 350) #Prepare little window
return None
#Set fullscreen for app
def SessionScreenFull(inSessionHex, inLogger = None, inRDPConfigurationItem = None):
########################################
lWindowRDPRetryIterator = 0 # Retry iterator if RDP window is not active
lRDPConfigurationItem = inRDPConfigurationItem # Get the RDP configuration item
lL = inLogger # Get the logger instance
lRDPWindow = None # Init the variable
while lWindowRDPRetryIterator<gRecoveryWindowRDPRetryCountInt: # Loop iteration to connect to RDP
try: # Try to get RDP window
lRDPWindow = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHex}.*", "backend": "win32"}])
lWindowRDPRetryIterator = gRecoveryWindowRDPRetryCountInt # Set last iterator to turn off the loop
# RDP window has been detected - go to set focus and maximize
lRDPWindow.set_focus()
lRDPWindow.maximize()
if not SessionIsFullScreen(inSessionHex):
lRDPWindow.type_keys("^%{BREAK}")
time.sleep(0.5)
except Exception as e: # RDP window is not exist - try to reconnect
if lL: lL.warning(f"RDP::SessionScreenFull: RDP window is not exist - try sleep {gRecoveryWindowRDPRetryIntervalSecInt}[s.] and then to reconnect. SessionHex: {inSessionHex}. Current retry iterator is {lWindowRDPRetryIterator}")
if lRDPConfigurationItem:
time.sleep(gRecoveryWindowRDPRetryIntervalSecInt) # Before try to reconnect sleep
# Try to reconnect the RDP
SessionClose(inSessionHexStr=inSessionHex) # Close the
Session(lRDPConfigurationItem) # Create RDP session
inSessionHex = lRDPConfigurationItem["SessionHex"] # Get new session hex after reconnect
SystemRDPWarningClickOk() # Click all warning messages
lWindowRDPRetryIterator = lWindowRDPRetryIterator + 1 # increase the iterator
if lWindowRDPRetryIterator >= gRecoveryWindowRDPRetryCountInt: # Raise the error if retry count is over
if lL: lL.warning(f"RDP::SessionScreenFull: Retry count is over. Raise the error. SessionHex: {inSessionHex}.") # Log the info
raise ConnectorExceptions.SessionWindowNotExistError() # raise the error
else:
if lL: lL.warning(f"RDP::SessionScreenFull: Has no RDP configuration item - don't reconnect. Raise the error. SessionHex: {inSessionHex}") # Log the info
raise ConnectorExceptions.SessionWindowNotExistError() # raise the error
return None
# Set the screen size
def SessionScreenSize_X_Y_W_H(inSessionHex, inXInt, inYInt, inWInt, inHInt):
lDoBool = True
while lDoBool:
#Prepare little window
try:
lRDPWindow = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHex}.*", "backend": "win32"}])
except Exception as e:
return None
try:
lRDPWindow.set_focus()
if SessionIsFullScreen(inSessionHex):
lRDPWindow.type_keys("^%{BREAK}")
time.sleep(0.5)
lRDPWindow.restore()
time.sleep(0.5)
lRDPWindow.move_window(inXInt,inYInt,inWInt,inHInt)
except Exception as e:
time.sleep(1)
else:
lDoBool = False
return None
# Set Little window of the session
def SessionScreen100x550(inSessionHex):
SessionScreenSize_X_Y_W_H(inSessionHex = inSessionHex, inXInt = 10, inYInt = 10, inWInt = 550, inHInt = 100)
return None
# Session - close window
def SessionClose(inSessionHexStr):
#Close window
try:
UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHexStr}.*", "backend": "win32"}]).close()
except Exception as e:
pass
#Type command in CMD
# inSessionHex - SessionHex to catch window
# inModeStr "LISTEN", "CROSSCHECK", "RUN"
# "LISTEN" - Get result of the cmd command in result TODO get home script
# "CROSSCHECK" - Check if the command was successufully sent TODO get home script
# "RUN" - Run without crosscheck and get clipboard
# return {
# "OutStr": <> # Result string
# "IsResponsibleBool": True|False # Flag is RDP is responsible - works only when inModeStr = CROSSCHECK
# }
# example Connector.SessionCMDRun("4d1e48f3ff6c45cc810ea25d8adbeb50","start notepad", "RUN")
def SessionCMDRun(inSessionHex,inCMDCommandStr = "echo 1", inModeStr="CROSSCHECK", inClipboardTimeoutSec = 5, inLogger=None, inRDPConfigurationItem=None):
lL = inLogger # Init the logger
# Init the result dict
lResult = {"OutStr": None,"IsResponsibleBool":True}
SessionScreenFull(inSessionHex, inLogger=lL, inRDPConfigurationItem=inRDPConfigurationItem) # Enter full screen mode with recovery scenario
time.sleep(2)
# Run CMD operations
lResult = SystemCMDRun(inSessionHexStr = inRDPConfigurationItem["SessionHex"], inCMDCommandStr = inCMDCommandStr, inModeStr = inModeStr, inClipboardTimeoutSec = inClipboardTimeoutSec, inLogger=lL)
# Exit fullscreen mode
SessionScreenSize_X_Y_W_H(inSessionHex=inSessionHex, inXInt=10, inYInt=10, inWInt=550,
inHInt=350) # Prepare little window
return lResult
# Check if session is in Full screen mode
# Return True - is in fullscreen
# example print(Connector.SessionIsFullScreen(""))
def SessionIsFullScreen(inSessionHexStr):
#Default resul
lResult = False
lWeight = GetSystemMetrics(0)
lHeight = GetSystemMetrics(1)
#Get window screen
try:
lRectangle = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHexStr}.*", "backend": "win32"}]).rectangle()
except Exception as e:
return lResult
# Get Height/Weight
lSessionWeight = lRectangle.right - lRectangle.left
lSessionHeight = lRectangle.bottom - lRectangle.top
#Case fullscreen
if lSessionHeight == lHeight and lSessionWeight == lWeight:
lResult = True
return lResult
# Check if session is in minimized screen mode
# Return True - is in minimized
# example print(Connector.SessionIsFullScreen(""))
def SessionIsMinimizedScreen(inSessionHexStr):
#Default result
lResult = False
#Get window screen
try:
lResult = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHexStr}.*", "backend": "win32"}]).is_minimized()
except Exception as e:
pass
return lResult
# Check if RDP session is responsible (check with random combination in cmd)
# Attention - function will be work fine if RDP will be in full screen mode!!! (see def SessionScreenFull)
# Return True - is responsible; False - is not responsible
#Type command in CMD
# inFlagDoCrossCheck: True - Do check that CMD is executed (the text response will not be available)
# inModeStr "LISTEN", "CROSSCHECK", "RUN"
# "LISTEN" - Get result of the cmd command in result TODO get home script
# "CROSSCHECK" - Check if the command was successufully sent TODO get home script
# "RUN" - Run without crosscheck and get clipboard
# inClipboardTimeoutSec # Second for wait when clipboard will changed
# return {
# "OutStr": <> # Result string
# "IsResponsibleBool": True|False # Flag is RDP is responsible - works only when inModeStr = CROSSCHECK
# }
# example Connector.SessionCMDRun("4d1e48f3ff6c45cc810ea25d8adbeb50","start notepad", "RUN")
def SystemCMDRun(inSessionHexStr, inCMDCommandStr = "echo 1", inModeStr="CROSSCHECK", inClipboardTimeoutSec = 5, inLogger = None):
lRDPWindow = None # Init the UI object
try:
lRDPWindow = UIDesktop.UIOSelector_Get_UIO([{"title_re": f"{inSessionHexStr}.*", "backend": "win32"}])
except Exception as e:
raise ConnectorExceptions.SessionWindowNotExistError() # Raise error of gui window
lL = inLogger # Alias for logger
lResult = {"OutStr": None,"IsResponsibleBool":True} # Init the result dict
lClipboardTextOld = str(random.randrange(999,9999999)) # Set random text to clipboard (for check purposes that clipboard text has been changed)
Clipboard.TextSet(lClipboardTextOld)
lCrosscheckKeyStr = str(random.randrange(999,9999999))
lRecoveryCMDResponsibleRetryIteratorInt = 0 # Init the retry iterator
lCommandIsTooBigBool = False
lCMDPostFixStr = "" # Case default "RUN"
while lRecoveryCMDResponsibleRetryIteratorInt<gRecoveryCMDResponsibleRetryCountInt: # loop for retry
# # # # # # # # # # # # # OPEN WINDOW RUN # # # # # # # # # # # # # # #
lRecoveryWindowRUNRetryIteratorInt = 0 # Init the retry iterator
while lRecoveryWindowRUNRetryIteratorInt<gRecoveryWindowRUNRetryCountInt: # loop for retry
if inModeStr == "CROSSCHECK":
#lCMDPostFixStr = f"& (echo {lCrosscheckKeyStr} | clip)"
lCMDPostFixStr = f"| (echo {lCrosscheckKeyStr} | clip)" # Bugfix async set clipboard data
elif inModeStr == "LISTEN":
lCMDPostFixStr = f"| clip"
Clipboard.TextSet("") # Clear the clipboard
KeyboardHotkey("win","r") # win+r
time.sleep(gKeyboardHotkeyDelaySecFloat)
KeyboardHotkey("ctrl","a") # Select all
KeyboardHotkey("ctrl","a") # Select all Bugfix - need double ctrl A because some OS reset selestion when ctrl A in RUN window
#time.sleep(gKeyboardHotkeyDelaySecFloat)
keyboard.send("backspace") # Delete selected all
time.sleep(gKeyboardHotkeyDelaySecFloat) # Wait for RUN window will appear ctrl+a+backspace is async - so we need some timeout...
lInputStr = f"cmd /c ({inCMDCommandStr}) {lCMDPostFixStr}" # Generate the output string for RUN window
if len(lInputStr) <= 259:
keyboard.write(lInputStr, delay=0.05) # Write new text
else:
if lL: lL.warning(
f"RDP.SystemCMDRun: ATTENTION! Your command is too big for the RUN window (len is {len(lInputStr)}). Orchestrator will send this command to the new cmd window. ")
lInputStr = "cmd"
lCommandIsTooBigBool = True
keyboard.write(lInputStr, delay=0.05) # Write cmd
time.sleep(gKeyboardHotkeyDelaySecFloat)
# Check if autocomplete
# # # # # # #
KeyboardHotkey("ctrl", "c") # Copy data
# Check the clipboard
lClipboardStr = Clipboard.TextGet() # Get text from clipboard
time.sleep(gKeyboardHotkeyDelaySecFloat)
#lL.debug(f"Clipboard text when check autocomplete:'{lClipboardStr}'")
if lClipboardStr != "": # Send backspace if
time.sleep(gKeyboardHotkeyDelaySecFloat)
keyboard.send("backspace") # Delete selected all
# # # # # # # #
KeyboardHotkey("ctrl","a") # Select all
KeyboardHotkey("ctrl","c") # Copy data
# Check the clipboard
lClipboardWaitTimeStartSec = time.time()
lClipboardStr = Clipboard.TextGet() # Get text from clipboard
while lClipboardStr != lInputStr and (time.time() - lClipboardWaitTimeStartSec) <= inClipboardTimeoutSec:
lClipboardStr = Clipboard.TextGet() # Get text from clipboard
time.sleep(0.5) # wait some time for the next operation
if lClipboardStr == lInputStr: # Cross check the clipboard data and input string
lRecoveryWindowRUNRetryIteratorInt = gRecoveryWindowRUNRetryCountInt # Set final count to block the loop
else: # Failed - wait and retry
if lL: lL.warning(f"RDP::SystemCMDRun: RUN window (win+r) doesn't appear. Wait for {gRecoveryWindowRUNRetryIntervalSecInt}[s.] and retry. Current retry iterator is {lRecoveryWindowRUNRetryIteratorInt}. CMD Str: {lInputStr}") # Log the error
lRecoveryWindowRUNRetryIteratorInt = lRecoveryWindowRUNRetryIteratorInt + 1 # Increment the iterator
if lRecoveryWindowRUNRetryIteratorInt == gRecoveryWindowRUNRetryCountInt:
if lL: lL.warning(f"RDP::SystemCMDRun: RUN window (win+r) retry count is over. Raise the error. CMD Str: {lInputStr}") # Log the error
raise ConnectorExceptions.RUNExistError() # Raise the error
time.sleep(gRecoveryWindowRUNRetryIntervalSecInt) # wait for some seconds before new iteration
# # # # # # # # # # # # # # # # # # # # # # # # # # # #
# # # # # # # # # # # # # RUN CMD # # # # # # # # # # # # # # #
if inModeStr == "LISTEN": # if mode == LISTEN - set random number in clipboard
Clipboard.TextSet(lClipboardTextOld) #
time.sleep(0.5) # wait some time for the next operation
keyboard.press_and_release('enter') # Execute CMD
time.sleep(1)
if lCommandIsTooBigBool == True:
# Case when string is tool big - call cmd and then type text into
#keyboard.write("cmd") # Open cmd
#time.sleep(1)
#keyboard.press_and_release('enter') # Execute CMD
keyboard.write(f"(({inCMDCommandStr}) {lCMDPostFixStr}) && exit", delay=0.05) # send command
time.sleep(1)
keyboard.press_and_release('enter') # Execute command
if inModeStr == "CROSSCHECK" or inModeStr == "LISTEN": # Get OutStr (Case CROSSCHECK and LISTEN)
lClipboardWaitTimeStartSec = time.time()
lResult["OutStr"] = Clipboard.TextGet() # Get text from clipboard
while lResult["OutStr"].startswith(lClipboardTextOld) and (time.time() - lClipboardWaitTimeStartSec) <= inClipboardTimeoutSec:
lResult["OutStr"] = Clipboard.TextGet() # Get text from clipboard
time.sleep(0.5) # wait some time for the next operation
if lResult["OutStr"] == lClipboardTextOld: # If value hasn't been changed - retry send
lRecoveryCMDResponsibleRetryIteratorInt = lRecoveryCMDResponsibleRetryIteratorInt + 1 # increment the iterator
if lL: lL.warning(f"RDP::SystemCMDRun: CMD command doesn't been executed (no changes in clipboard data). Wait for {gRecoveryCMDResponsibleRetryIntervalSecInt}[s.] and retry from start window RUN. Current retry iterator is {lRecoveryCMDResponsibleRetryIteratorInt}. CMD Str: {lInputStr}, Clipboard data: {lResult['OutStr']}") # Log the error
if lRecoveryCMDResponsibleRetryIteratorInt >= gRecoveryCMDResponsibleRetryCountInt: # raise the error if retry count is exceeded
if lL: lL.warning(f"RDP::SystemCMDRun: CMD command retry count is over. Raise the error.") # Log the error
raise ConnectorExceptions.CMDResponsibleError() # Raise the error
time.sleep(gRecoveryCMDResponsibleRetryIntervalSecInt) # wait for some seconds before new iteration
else: # Data was recieved - do crosscheck
if inModeStr == "CROSSCHECK":
if lResult["OutStr"] == f"{lCrosscheckKeyStr} \r\n\x00\x00\x00\x00\x00": # it is ok - do futher check
lResult["IsResponsibleBool"] = True
lRecoveryCMDResponsibleRetryIteratorInt = gRecoveryCMDResponsibleRetryCountInt # turn off the iterator
else:
lResult["IsResponsibleBool"] = False
lRecoveryCMDResponsibleRetryIteratorInt = lRecoveryCMDResponsibleRetryIteratorInt + 1 # increment the iterator
if lL: lL.warning(f"RDP::SystemCMDRun: CMD command doesn't been executed (wrong clipboard data). Wait for {gRecoveryCMDResponsibleRetryIntervalSecInt}[s.] and retry from start window RUN. Current retry iterator is {lRecoveryCMDResponsibleRetryIteratorInt}. CMD Str: {lInputStr}, Clipboard data: {lResult['OutStr']}") # Log the error
if lRecoveryCMDResponsibleRetryIteratorInt >= gRecoveryCMDResponsibleRetryCountInt: # raise the error if retry count is exceeded
if lL: lL.warning(f"RDP::SystemCMDRun: CMD command retry count is over. Raise the error.") # Log the error
raise ConnectorExceptions.CMDResponsibleError() # Raise the error
time.sleep(gRecoveryCMDResponsibleRetryIntervalSecInt) # wait for some seconds before new iteration
else: # clipboard data has been changed but mode is not crosscheck - return success from function
lRecoveryCMDResponsibleRetryIteratorInt = gRecoveryCMDResponsibleRetryCountInt # turn off the iterator
else: # Success - no cross validation is aaplicable
lRecoveryCMDResponsibleRetryIteratorInt = gRecoveryCMDResponsibleRetryCountInt # turn off the iterator
# # # # # # # # # # # # # # # # # # # # # # # # # # # #
return lResult # return the result
# Check if current RDP is responsible
def SystemRDPIsResponsible(inSessionHexStr):
return SystemCMDRun(inSessionHexStr = inSessionHexStr, inCMDCommandStr = "echo 1", inModeStr="CROSSCHECK")["IsResponsibleBool"]
# Click OK on error messages
def SystemRDPWarningClickOk():
# Try to click OK Error window in RUS version
while UIDesktop.UIOSelector_Exist_Bool([{"title": "Подключение к удаленному рабочему столу", "class_name": "#32770", "backend": "win32"},
{"title": "ОК", "class_name": "Button"}]):
try:
UIDesktop.UIOSelector_Get_UIO([{"title": "Подключение к удаленному рабочему столу", "class_name": "#32770", "backend": "win32"},
{"title": "ОК", "class_name": "Button"}]).click()
except Exception as e:
pass
# Try to click OK Error window in ENG version
while UIDesktop.UIOSelector_Exist_Bool([{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "OK", "class_name": "Button"}]):
try:
UIDesktop.UIOSelector_Get_UIO([{"title": "Remote Desktop Connection", "class_name": "#32770", "backend": "win32"},
{"title": "OK", "class_name": "Button"}]).click()
except Exception as e:
pass
# Bugfix with RDP work
def KeyboardHotkey(inModifierKeyStr,inKeyStr, inDelaySecFloat=gKeyboardHotkeyDelaySecFloat):
keyboard.press(inModifierKeyStr)
time.sleep(inDelaySecFloat)
keyboard.send(inKeyStr)
time.sleep(inDelaySecFloat)
keyboard.release(inModifierKeyStr)
time.sleep(inDelaySecFloat)