@ -10,6 +10,7 @@ import keyboard # Keyboard functions
import time
import random # random integers
from win32api import GetSystemMetrics # Get Screen rect
import pyautogui # for hotkey operations
# System variables for recovery scenario
gRecoveryWindowRDPRetryCountInt = 3 # Retry iteration count is RDP window is not responsible
gRecoveryWindowRDPRetryIntervalSecInt = 3 # Retry interval for reconnect
@ -34,7 +35,7 @@ gRecoveryCMDResponsibleRetryIntervalSecInt = 3 # Retry interval for retry
}
}
"""
def Session ( inRDPSessionConfiguration ):
def Session ( inRDPSessionConfiguration , inScreenSize550x350Bool = False ):
#RDPConnector.SessionConnect(mConfiguration)
#RDPConnector.LoginPassSet("111.222.222.111","ww","dd")
( lRDPFile , lSessionHex ) = SessionConfigurationCreate ( inRDPSessionConfiguration )
@ -43,7 +44,7 @@ def Session(inRDPSessionConfiguration):
#Set login/password
SessionLoginPasswordSet ( inRDPSessionConfiguration [ " Host " ] , inRDPSessionConfiguration [ " Login " ] , inRDPSessionConfiguration [ " Password " ] )
#Start session
SessionRDPStart ( lRDPFile )
SessionRDPStart ( lRDPFile , inScreenSize550x350Bool = inScreenSize550x350Bool )
#Remove temp file
time . sleep ( 4 ) #Delete file after some delay - one way to delete and run the RDP before because RDP is not read file in one moment
os . remove ( lRDPFile ) # delete the temp rdp
@ -89,7 +90,7 @@ def SessionConfigurationCreate(inConfiguration):
#Return .rdp full path
return ( lRDPCurrentFileFullPath , ( lRDPCurrentFileFullPath . split ( " \\ " ) [ - 1 ] ) [ 0 : - 4 ] )
#RDPSessionStart
def SessionRDPStart ( inRDPFilePath ):
def SessionRDPStart ( inRDPFilePath , inScreenSize550x350Bool = False ):
#Disable certificate warning
lCMDString = ' reg add " HKEY_CURRENT_USER \\ Software \\ Microsoft \\ Terminal Server Client " /v " AuthenticationLevelOverride " /t " REG_DWORD " /d 0 /f '
os . system ( lCMDString )
@ -147,24 +148,27 @@ def SessionRDPStart(inRDPFilePath):
# Raise exception if RDP is not active
if len ( lWaitResult ) == 0 :
raise ConnectorExceptions . SessionWindowNotExistError ( " Error when initialize the RDP session - No RDP windows has appreared! " )
# Wait for init
time . sleep ( 3 )
SessionScreenSize_X_Y_W_H ( inSessionHex = lRDPFileName , inXInt = 10 , inYInt = 10 , inWInt = 550 , inHInt = 350 ) #Prepare little window
time . sleep ( 3 ) # Wait for init
if inScreenSize550x350Bool : SessionScreenSize_X_Y_W_H ( inSessionHex = lRDPFileName , inXInt = 10 , inYInt = 10 , inWInt = 550 , inHInt = 350 ) #Prepare little window
return None
#Set fullscreen for app
def SessionScreenFull ( inSessionHex , in GSettings = None ) :
def SessionScreenFull ( inSessionHex , in Logger = None , inRDPConfigurationItem = None ) :
########################################
lWindowRDPRetryIterator = 0 # Retry iterator if RDP window is not active
lRDPConfigurationItem = None
lL = None
if inGSettings : # Get the values from gSettings
lRDPConfigurationItem = inGSettings [ " RobotRDPActive " ] [ " RDPList " ] [ inSessionHex ] # Get the RDP configuration item
lL = inGSettings [ " Logger " ] # Get the logger instance
lRDPConfigurationItem = inRDPConfigurationItem # Get the RDP configuration item
lL = inLogger # Get the logger instance
lRDPWindow = None # Init the variable
while lWindowRDPRetryIterator < gRecoveryWindowRDPRetryCountInt : # Loop iteration to connect to RDP
try : # Try to get RDP window
lRDPWindow = UIDesktop . UIOSelector_Get_UIO ( [ { " title_re " : f " { inSessionHex } .* " , " backend " : " win32 " } ] )
lWindowRDPRetryIterator = gRecoveryWindowRDPRetryCountInt # Set last iterator to turn off the loop
# RDP window has been detected - go to set focus and maximize
lRDPWindow . set_focus ( )
lRDPWindow . maximize ( )
if not SessionIsFullScreen ( inSessionHex ) :
lRDPWindow . type_keys ( " ^ % {BREAK} " )
time . sleep ( 0.5 )
except Exception as e : # RDP window is not exist - try to reconnect
if lL : lL . warning ( f " RDP::SessionScreenFull: RDP window is not exist - try sleep { gRecoveryWindowRDPRetryIntervalSecInt } [s.] and then to reconnect. SessionHex: { inSessionHex } . Current retry iterator is { lWindowRDPRetryIterator } " )
if lRDPConfigurationItem :
@ -172,6 +176,8 @@ def SessionScreenFull(inSessionHex, inGSettings = None):
# Try to reconnect the RDP
SessionClose ( inSessionHexStr = inSessionHex ) # Close the
Session ( lRDPConfigurationItem ) # Create RDP session
inSessionHex = lRDPConfigurationItem [ " SessionHex " ] # Get new session hex after reconnect
SystemRDPWarningClickOk ( ) # Click all warning messages
lWindowRDPRetryIterator = lWindowRDPRetryIterator + 1 # increase the iterator
if lWindowRDPRetryIterator > = gRecoveryWindowRDPRetryCountInt : # Raise the error if retry count is over
if lL : lL . warning ( f " RDP::SessionScreenFull: Retry count is over. Raise the error. SessionHex: { inSessionHex } . " ) # Log the info
@ -179,13 +185,6 @@ def SessionScreenFull(inSessionHex, inGSettings = None):
else :
if lL : lL . warning ( f " RDP::SessionScreenFull: Has no RDP configuration item - don ' t reconnect. Raise the error. SessionHex: { inSessionHex } " ) # Log the info
raise ConnectorExceptions . SessionWindowNotExistError ( ) # raise the error
# RDP window has been detected - go to set focus and maximize
lRDPWindow . set_focus ( )
lRDPWindow . maximize ( )
#time.sleep(0.5)
if not SessionIsFullScreen ( inSessionHex ) :
lRDPWindow . type_keys ( " ^ % {BREAK} " )
time . sleep ( 0.5 )
return None
# Set the screen size
@ -233,16 +232,14 @@ def SessionClose(inSessionHexStr):
# "IsResponsibleBool": True|False # Flag is RDP is responsible - works only when inModeStr = CROSSCHECK
# }
# example Connector.SessionCMDRun("4d1e48f3ff6c45cc810ea25d8adbeb50","start notepad", "RUN")
def SessionCMDRun ( inSessionHex , inCMDCommandStr = " echo 1 " , inModeStr = " CROSSCHECK " , inClipboardTimeoutSec = 5 , inGSettings = None ) :
lL = None # Init the logger
if inGSettings : # If gSettings is exist
lL = inGSettings [ " Logger " ] # get the logger
def SessionCMDRun ( inSessionHex , inCMDCommandStr = " echo 1 " , inModeStr = " CROSSCHECK " , inClipboardTimeoutSec = 5 , inLogger = None , inRDPConfigurationItem = None ) :
lL = inLogger # Init the logger
# Init the result dict
lResult = { " OutStr " : None , " IsResponsibleBool " : True }
SessionScreenFull ( inSessionHex , in GSettings = inGSettings ) # Enter full screen mode with recovery scenario
SessionScreenFull ( inSessionHex , in Logger= lL , inRDPConfigurationItem = inRDPConfigurationItem ) # Enter full screen mode with recovery scenario
time . sleep ( 2 )
# Run CMD operations
lResult = SystemCMDRun ( in CMDCommandStr = inCMDCommandStr , inModeStr = inModeStr , inClipboardTimeoutSec = inClipboardTimeoutSec , inLogger = lL )
lResult = SystemCMDRun ( in SessionHexStr = inRDPConfigurationItem [ " SessionHex " ] , in CMDCommandStr = inCMDCommandStr , inModeStr = inModeStr , inClipboardTimeoutSec = inClipboardTimeoutSec , inLogger = lL )
# Exit fullscreen mode
SessionScreenSize_X_Y_W_H ( inSessionHex = inSessionHex , inXInt = 10 , inYInt = 10 , inWInt = 550 ,
inHInt = 350 ) # Prepare little window
@ -294,7 +291,12 @@ def SessionIsMinimizedScreen(inSessionHexStr):
# "IsResponsibleBool": True|False # Flag is RDP is responsible - works only when inModeStr = CROSSCHECK
# }
# example Connector.SessionCMDRun("4d1e48f3ff6c45cc810ea25d8adbeb50","start notepad", "RUN")
def SystemCMDRun ( inCMDCommandStr = " echo 1 " , inModeStr = " CROSSCHECK " , inClipboardTimeoutSec = 5 , inLogger = None ) :
def SystemCMDRun ( inSessionHexStr , inCMDCommandStr = " echo 1 " , inModeStr = " CROSSCHECK " , inClipboardTimeoutSec = 5 , inLogger = None ) :
lRDPWindow = None # Init the UI object
try :
lRDPWindow = UIDesktop . UIOSelector_Get_UIO ( [ { " title_re " : f " { inSessionHexStr } .* " , " backend " : " win32 " } ] )
except Exception as e :
raise ConnectorExceptions . SessionWindowNotExistError ( ) # Raise error of gui window
lL = inLogger # Alias for logger
lResult = { " OutStr " : None , " IsResponsibleBool " : True } # Init the result dict
lClipboardTextOld = str ( random . randrange ( 999 , 9999999 ) ) # Set random text to clipboard (for check purposes that clipboard text has been changed)
@ -307,18 +309,19 @@ def SystemCMDRun(inCMDCommandStr = "echo 1", inModeStr="CROSSCHECK", inClipboard
while lRecoveryWindowRUNRetryIteratorInt < gRecoveryWindowRUNRetryCountInt : # loop for retry
lCMDPostFixStr = " " # Case default "RUN"
if inModeStr == " CROSSCHECK " :
lCMDPostFixStr = f " | echo { lCrosscheckKeyStr } | clip "
lCMDPostFixStr = f " & ( echo { lCrosscheckKeyStr } | clip ) "
elif inModeStr == " LISTEN " :
lCMDPostFixStr = f " | clip "
keyboard . press_and_release ( ' win+r ' )
time . sleep ( 1 ) # Wait for RUN window will appear
keyboard. press_and_release ( " ctrl+a " ) # Select all
lRDPWindow. type_keys ( " ^(a) " ) # Select all
keyboard . press_and_release ( " backspace " ) # Delete selected all
lInputStr = f " cmd /c { inCMDCommandStr } { lCMDPostFixStr } " # Generate the output string for RUN window
time . sleep ( 0.5 ) # Wait for RUN window will appear ctrl+a+backspace is async - so we need some timeout...
lInputStr = f " cmd /c ( { inCMDCommandStr } ) { lCMDPostFixStr } " # Generate the output string for RUN window
keyboard . write ( lInputStr ) # Write new text
keyboard. press_and_release ( " ctrl+a " ) # Select all
keyboard. press_and_release ( " ctrl+c " ) # COPY in clipboard to check
#time.sleep(2 + 2 * lRecoveryWindowRUNRetryIteratorInt) # Wait when data will be copied UPD - no need - wait is below
time. sleep ( 0.5 )
lRDPWindow. type_keys ( " ^(a) " ) # Select all
lRDPWindow . type_keys ( " ^(c) " ) # Copy data
# Check the clipboard
lClipboardWaitTimeStartSec = time . time ( )
lClipboardStr = Clipboard . TextGet ( ) # Get text from clipboard
@ -370,8 +373,8 @@ def SystemCMDRun(inCMDCommandStr = "echo 1", inModeStr="CROSSCHECK", inClipboard
# # # # # # # # # # # # # # # # # # # # # # # # # # # #
return lResult # return the result
# Check if current RDP is responsible
def SystemRDPIsResponsible ( ) :
return SystemCMDRun ( in CMDCommandStr = " echo 1 " , inModeStr = " CROSSCHECK " ) [ " IsResponsibleBool " ]
def SystemRDPIsResponsible ( inSessionHexStr ) :
return SystemCMDRun ( in SessionHexStr = inSessionHexStr , in CMDCommandStr = " echo 1 " , inModeStr = " CROSSCHECK " ) [ " IsResponsibleBool " ]
# Click OK on error messages
def SystemRDPWarningClickOk ( ) :
# Try to click OK Error window in RUS version