import psutil
import datetime
import logging
import sys # stdout fro logging
def RenderRobotR01(inGlobalConfiguration):
#Subheader Variants
lSubheaderRunTrueText="Состояние: Работает"
lSubheaderRunFalseText="Состояние: Не работает"
#Run button
#Такое большое количество слэшей связано с тем, что этот текст отправляется сначала в браузер, рендерится там, а потом отправляется на процессор оркестратора
lOnClickRunButton="""mGlobal.Controller.CMDRunText('C:\\test.png');"""
#Force close button
lOnClickForceCloseButton="""mGlobal.Controller.CMDRunText('taskkill /F /im Robot_R01.exe');"""
#Result template
lResultDict={
"HeaderLeftText":"Автозагрузка заявок на расход",
"HeaderRightText":"R01",
"DataStorageKey":"R01_Data", #Use key for set current dict in mGlobal.DataStorage["DtaaStorageKey"] on client side
"SubheaderText":lSubheaderRunFalseText,
"BodyKeyValueList":[
#Дата запуска: 10:00:09 01.09.2019
{"Key":"Дата запуска","Value":"Не запущен"},
{"Key":"Текущий шаг","Value":"Не запущен"},
{"Key":"Время выполнения шага","Value":"--с."},
{"Key":"Отчет робота","Value":"Скачать"}
],
"FooterText":"Дата изменения: 9:38:00 09.10.2019",
"FooterButtonX2List":[
{"Text":"Ручной запуск", "Color":"green", "Link":"", "OnClick": lOnClickRunButton.replace("\\","\\\\")},
{"Text":"Безопасная остановка", "Color disabled":"orange", "Link":""}
],
"FooterButtonX1List":[
{"Text":"Принудительная остановка", "Color":"red", "Link":"", "OnClick": lOnClickForceCloseButton.replace("\\","\\\\")}
]
}
#Check if process running
if CheckIfProcessRunning("Robot_R01"):
lResultDict["SubheaderText"]=lSubheaderRunTrueText
#Fill robot info from Storage - R01
if "Robot_R01" in inGlobalConfiguration.get("Storage",{}):
lItemDict=inGlobalConfiguration["Storage"]["Robot_R01"]
#lResultDict["HeaderLeftText"]=lItemDict["Name"]
#lResultDict["HeaderRightText"]=lItemDict["Code"]
lResultDict["BodyKeyValueList"][0]["Value"]=lItemDict.get("RunDateTimeString","Не запущен")
lResultDict["BodyKeyValueList"][1]["Value"]=lItemDict.get("StepCurrentName","Не запущен")
lResultDict["BodyKeyValueList"][2]["Value"]=lItemDict.get("StepCurrentDuration","--с.")
else:
#Process not running
lResultDict["FooterText"]=f'Дата изменения: {datetime.datetime.now().strftime("%H:%M:%S %d.%m.%Y")}'
else:
#Process not running
lResultDict["FooterText"]=f'Дата изменения: {datetime.datetime.now().strftime("%H:%M:%S %d.%m.%Y")}'
return lResultDict
def CheckIfProcessRunning(processName):
'''
Check if there is any running process that contains the given name processName.
'''
#Iterate over the all the running process
for proc in psutil.process_iter():
try:
# Check if process name contains the given name string.
if processName.lower() in proc.name().lower():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False;
#Orchestrator settings
def Settings():
import os
import pyOpenRPA.Orchestrator
lOrchestratorFolder = "\\".join(pyOpenRPA.Orchestrator.__file__.split("\\")[:-1])
mDict = {
"Server": {
"ListenPort_": "Порт, по которому можно подключиться к демону",
"ListenPort": 80,
"ListenURLList": [
{
"Description": "Local machine test",
"URL_": "Сетевое расположение сервера демона",
"URL": ""
}
],
"AccessUsers": { #Default - all URL is blocked
"FlagCredentialsAsk": True, #Turn on Authentication
"RuleDomainUserDict": {
#("DOMAIN", "USER"): { !!!!! only in upper case !!!!
# "MethodMatchURLBeforeList": [
# {
# "Method":"GET|POST",
# "MatchType":"BeginWith|Contains|Equal|EqualCase",
# "URL":"",
# "FlagAccessDefRequestGlobalAuthenticate": None, #Return bool
# "FlagAccess": True
# }
# ],
# "ControlPanelKeyAllowedList":[] # If empty - all is allowed
#}
},
"RuleMethodMatchURLBeforeList": [ #General MethodMatchURL list (no domain/user)
# {
# "Method":"GET|POST",
# "MatchType":"BeginWith|Contains|Equal|EqualCase",
# "URL":"",
# "FlagAccessDefRequestGlobalAuthenticate": None, #Return bool
# "FlagAccess": True
# }
],
"AuthTokensDict": {
#"":{"User":"", "Domain":"", "TokenDatetime":, "FlagDoNotExpire":True}
}
},
"URLList":[ #List of available URLs with the orchestrator server
#{
# "Method":"GET|POST",
# "URL": "/index", #URL of the request
# "MatchType": "", #"BeginWith|Contains|Equal|EqualCase",
# "ResponseFilePath": "", #Absolute or relative path
# "ResponseFolderPath": "", #Absolute or relative path
# "ResponseContentType": "", #HTTP Content-type
# "ResponseDefRequestGlobal": None #Function with str result
#}
{
"Method":"GET",
"URL": "/test/", #URL of the request
"MatchType": "BeginWith", #"BeginWith|Contains|Equal|EqualCase",
#"ResponseFilePath": "", #Absolute or relative path
"ResponseFolderPath": "C:\Abs\Archive\scopeSrcUL\OpenRPA\Orchestrator\Settings", #Absolute or relative path
#"ResponseContentType": "", #HTTP Content-type
#"ResponseDefRequestGlobal": None #Function with str result
}
]
},
"OrchestratorStart":{
"ActivityList":[
#{
# "Type": "ProcessStop", #Activity type
# "Name": "OpenRPARobotDaemon.exe", #Process name
# "FlagForce": True, #Force process close
# "User": "%username%" #Empty, user or %username%
#},
#{
# "Type": "ProcessStartIfTurnedOff", #Activity type
# "CheckTaskName": "notepad.exe", #Python function module name
# "Path": "notepad", #Python function name
# "ArgList": [] #Input python function args
#},
# {
# "Type": "RDPSessionConnect", #Activity type - start/connect RDP Session
# "RDPSessionKeyStr": "notepad.exe", #Python function module name
# "RDPConfigurationDict": {}
# },
# {
# "Type": "RDPSessionLogoff", #Activity type - logoff RDP Session
# "RDPSessionKeyStr": "notepad.exe", #Python function module name
# },
# {
# "Type": "RDPSessionDisconnect", #Activity type - disconnect the RDP Session without logoff
# "RDPSessionKeyStr": "notepad.exe", #Python function module name
# },
# {
# "Type": "RDPSessionFileSend", #Activity type - send file to RDP session
# ...
# },
# {
# "Type": "RDPSessionFileRecieve", #Activity type - recieve file from rdp session
# ...
# },
# {
# "Type": "RDPSessionProcessStart", #Activity type -
# ...
# },
]
},
"Scheduler": {
"ActivityTimeCheckLoopSeconds":5, #Количество секунд, между циклами проверки действий
"ActivityTimeList": [
{
"TimeHH:MM": "22:23", #Time [HH:MM] to trigger activity
"WeekdayList": [1,2,3,4,5,6,7], #List of the weekday index when activity is applicable, Default [1,2,3,4,5,6,7]
"Activity":{
"Type": "ProcessStart", #Activity type
"Path": "start", #Executable file path
"ArgList": ["cmd.exe","/c","PIPUpgrade.cmd"] #List of the arguments
}
},
{
"TimeHH:MM": "19:20", #Time [HH:MM] to trigger activity
"WeekdayList": [1, 2, 3], #List of the weekday index when activity is applicable, Default [1,2,3,4,5,6,7]
"Activity":{
"Type": "ProcessStop", #Activity type
"Name": "OpenRPARobotDaemon.exe", #Process name
"FlagForce": True, #Force process close
"User": "%username%" #Empty, user or %username%
}
},
{
"TimeHH:MMStart": "12:40", #Time [HH:MM] to trigger activity
"TimeHH:MMStop": "12:40",
"ActivityIntervalSeconds": 2,
"WeekdayList": [1, 2, 3, 4, 5, 6, 7], #List of the weekday index when activity is applicable, Default [1,2,3,4,5,6,7]
"Activity":{
"Type": "ProcessStartIfTurnedOff", #Activity type
"CheckTaskName": "notepad.exe", #Python function module name
"Path": "notepad", #Python function name
"ArgList": [] #Input python function args
}
}
],
"LogList": []
},
"Processor": {
"LogType_CMDStart": True, #LogType_: if Trace for command is selected for False, the tracing will be off for such activity type. Default True
"LogList": [] #List of processor activity, which was executed. Fill list when orchestrator is running
},
"ControlPanelDict": {
"RefreshSeconds": 5,
"RobotList": [
{
"RenderFunction": RenderRobotR01,
"KeyStr":"TestControlPanelKey"
}
]
},
# # # # # # # # # # # # # #
"RobotRDPActive":{
"RDPList": {
"RDPSessionKey":{
"Host": "77.77.22.22", # Host address
"Port": "3389", # RDP Port
"Login": "test", # Login
"Password": "test", # Password
"Screen": {
"Width": 1680, # Width of the remote desktop in pixels
"Height": 1050, # Height of the remote desktop in pixels
# "640x480" or "1680x1050" or "FullScreen". If Resolution not exists set full screen
"FlagUseAllMonitors": False, # True or False
"DepthBit": "32" # "32" or "24" or "16" or "15"
},
"SharedDriveList": ["c"], # List of the Root sesion hard drives
###### Will updated in program ############
"SessionHex": "", # Hex is created when robot runs
"SessionIsWindowExistBool": False, # Flag if the RDP window is exist, old name "FlagSessionIsActive". Check every n seconds
"SessionIsWindowResponsibleBool": False, # Flag if RDP window is responsible (recieve commands). Check every nn seconds. If window is Responsible - window is Exist too
"SessionIsIgnoredBool": False # Flag to ignore RDP window False - dont ignore, True - ignore
}
},
"ResponsibilityCheckIntervalSec": None,
# Seconds interval when Robot check the RDP responsibility. if None - dont check
"FullScreenRDPSessionKeyStr": None, # RDPSessionKeyStr of the current session which is full screened, None is no session in fullscreen
"ActivityList":[ # Technical Activity list for RobotRDPActive thread - equal to Main activity list, apply only RDP activity
# {
# "DefNameStr":"test", # Function name in RobotRDPActive.Processor
# "ArgList":[1,2,3], # Args list
# "ArgDict":{"ttt":1,"222":2,"dsd":3} # Args dictionary
# },
#{
# "DefNameStr": "RDPSessionConnect", # Function name in RobotRDPActive.Processor
# "ArgList": [], # Args list
# "ArgDict": {"inRDPSessionKeyStr": "TestRDP", "inHostStr": "77.44.33.22", "inPortStr": "3389",
# "inLoginStr": "login", "inPasswordStr": "pass"} # Args dictionary
#},
# {
# "DefNameStr": "RDPSessionDisconnect", # Disconnect the RDP session without logoff. Function name in RobotRDPActive.Processor
# "ArgList": [], # Args list
# "ArgDict": {"inRDPSessionKeyStr": "TestRDP"}
# },
# {
# "DefNameStr": "RDPSessionReconnect", # Disconnect the RDP session without logoff. Function name in RobotRDPActive.Processor
# "ArgList": [], # Args list
# "ArgDict": {"inRDPSessionKeyStr": "TestRDP"}
# }
]
},
# # # # # # # # # # # # # #
"FileManager": {
"FileURLFilePathDict_help": "https://localhost:8081/filemanager/. All FileURL s must be set in lowercase",
"FileURLFilePathDict": {
"r01/report.xlsx": "C:\\RPA\\R01_IntegrationOrderOut\\Data\\Reestr_otgruzok.xlsx"
}
},
"Logger": logging.getLogger("Orchestrator"),
"Storage": {
"Robot_R01_help": "Robot data storage in orchestrator env",
"Robot_R01": {},
"R01_OrchestratorToRobot":{"Test2":"Test2"}
}
}
#Создать файл логирования
# add filemode="w" to overwrite
if not os.path.exists("Reports"):
os.makedirs("Reports")
##########################
#Подготовка логгера Robot
#########################
mRobotLogger=mDict["Logger"]
mRobotLogger.setLevel(logging.INFO)
# create the logging file handler
mRobotLoggerFH = logging.FileHandler("Reports\ReportOrchestrator_"+datetime.datetime.now().strftime("%Y_%m_%d")+".log")
mRobotLoggerFormatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
mRobotLoggerFH.setFormatter(mRobotLoggerFormatter)
# add handler to logger object
mRobotLogger.addHandler(mRobotLoggerFH)
####################Add console output
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(mRobotLoggerFormatter)
mRobotLogger.addHandler(handler)
############################################
###################################
#Init .py files from Settings folder
####################################
#Get file list from Settings folder
import os
import pdb
#lFunction to call in subfiles
lSubmoduleFunctionName = "SettingsUpdate"
#lSettingsPath = os.path.join(inSettingsFolderPath, "Settings")
lSettingsPath = "\\".join(os.path.join(os.getcwd(),__file__).split("\\")[:-1])
#lSettingsPath = os.path.join(os.getcwd(), "Settings")
#Lambda function to get files .py from settings folder except Settings.py
lFileList = [f for f in os.listdir(lSettingsPath) if os.path.isfile(os.path.join(lSettingsPath, f)) and f.split(".")[-1] == "py" and os.path.join(lSettingsPath, f) != __file__]
import importlib.util
for lModuleFilePathItem in lFileList:
lModuleName = lModuleFilePathItem[0:-3]
lFileFullPath = os.path.join(lSettingsPath, lModuleFilePathItem)
lTechSpecification = importlib.util.spec_from_file_location(lModuleName, lFileFullPath)
lTechModuleFromSpec = importlib.util.module_from_spec(lTechSpecification)
lTechSpecificationModuleLoader = lTechSpecification.loader.exec_module(lTechModuleFromSpec)
if lSubmoduleFunctionName in dir(lTechModuleFromSpec):
#Run SettingUpdate function in submodule
getattr(lTechModuleFromSpec, lSubmoduleFunctionName)(mDict)
return mDict