import psutil , datetime , logging , os , sys
# Config settings
lPyOpenRPASourceFolderPathStr = ( r " ../Sources " ) # Path for test pyOpenRPA package
# Operations
if lPyOpenRPASourceFolderPathStr != " " : sys . path . insert ( 0 , os . path . abspath ( os . path . join ( lPyOpenRPASourceFolderPathStr ) ) ) # Path for test pyOpenRPA package
# Start import after config the pyOpenRPA folder
from pyOpenRPA . Orchestrator import SettingsTemplate # Import functionallity
from pyOpenRPA . Tools import CrossOS
from pyOpenRPA import Orchestrator # Import orchestrator main
from pyOpenRPA . Orchestrator . Server import app
import threading
from fastapi import Depends
from fastapi . responses import PlainTextResponse
from fastapi . responses import FileResponse
# Подключени файлов связанных с роботом-ка др о вико м01
@app.get ( path = " /HR_officer01/questions " , tags = [ " HR_officer01 " ] )
def get_file ( ) :
return FileResponse ( CrossOS . PathStr ( " Demo \\ HR_officer01 \\ Questions.csv " ) )
@app.get ( path = " /HR_officer01/results " , tags = [ " HR_officer01 " ] )
def get_file ( ) :
return FileResponse ( CrossOS . PathStr ( " Demo \\ HR_officer01 \\ All_results.csv " ) )
@app.get ( path = " /HR_officer01/logs " , tags = [ " HR_officer01 " ] )
def get_file ( ) :
return FileResponse ( CrossOS . PathStr ( " Demo \\ HR_officer01 \\ log.txt " ) )
@app.get ( path = " /HR_officer01/scripts " , tags = [ " HR_officer01 " ] )
def get_file ( ) :
return FileResponse ( CrossOS . PathStr ( " Demo \\ HR_officer01 \\ HR_officer01.js " ) )
@app.get ( path = " /HR_officer01/styles " , tags = [ " HR_officer01 " ] )
def get_file ( ) :
return FileResponse ( CrossOS . PathStr ( " Demo \\ HR_officer01 \\ HR_officer01.css " ) )
# Подключени файлов связанных с роботом-ка зна че е м01
@app.get ( path = " /Treasurer01/scripts " , tags = [ " Treasurer01 " ] )
def get_file ( ) :
return FileResponse ( " Demo \\ Treasurer01 \\ Treasurer01.js " )
# Пример создания функции на сервере (FASTAPI) /test/threads
@app.get ( path = " /test/threads " , tags = [ " Test " ] , response_class = PlainTextResponse )
def Threads ( ) : # inAuthDict:dict=Depends(IdentifyAuthorize)
#def Threads(inAuthDict:dict=Depends(IdentifyAuthorize)):# Используй, если требуется авторизация
lThreadStr = " "
for thread in threading . enumerate ( ) :
lThreadStr + = f " ПОТОК: { thread . name } \n "
#print(thread.name)
return lThreadStr
#Run as administrator (ONLY FOR WINDOWS)
if not Orchestrator . OrchestratorIsAdmin ( ) and CrossOS . IS_WINDOWS_BOOL :
Orchestrator . OrchestratorRerunAsAdmin ( )
print ( f " Orchestrator will be run as administrator! " )
else :
gSettings = Orchestrator . GSettingsGet ( )
#gSettings = SettingsTemplate.Create(inModeStr="BASIC") # Create GSettings with basic configuration - no more config is available from the box - you can create own
Orchestrator . OrchestratorLoggerGet ( ) . setLevel ( logging . INFO )
# TEST Add User ND - Add Login ND to superuser of the Orchestrator
lUACClientDict = SettingsTemplate . __UACClientAdminCreate__ ( )
- def UACKeyListCheck(inRequest, inRoleKeyList): #Check is client is has access for the key list
- def WebUserInfoGet(inRequest): # Return User info about request Return {"DomainUpperStr":"", "UserNameUpperStr": ""}
- def WebUserUACHierarchyGet(inRequest): # Return User UAC Hierarchy DICT Return {...}
- Scheduler
- Refactoring in gSettings (Scheduler > SchedulerDict)
- def SchedulerActivityTimeAddWeekly(inGSettings, inTimeHHMMStr="23:55:", inWeekdayList=[], inActivityList=[]): # Add activity in time weekly
- Scheduler now listen SchedulerDict
- def ProcessorActivityItemAppend(inGSettings, inDef, inArgList=[], inArgDict={}, inArgGSettingsStr=None, inArgLoggerStr=None): # Add Activity item in Processor list
4 years ago
Orchestrator . UACUpdate ( inGSettings = gSettings , inADLoginStr = " ND " , inADStr = " " , inADIsDefaultBool = True , inURLList = [ ] , inRoleHierarchyAllowedDict = lUACClientDict )
Orchestrator . UACUpdate ( inGSettings = gSettings , inADLoginStr = " rpa00 " , inADStr = " " , inADIsDefaultBool = True , inURLList = [ ] , inRoleHierarchyAllowedDict = lUACClientDict )
# TEST Add User IMaslov - Add Login IMaslov to superuser of the Orchestrator
Orchestrator . UACUpdate ( inGSettings = gSettings , inADLoginStr = " VLADICK " , inADStr = " " , inADIsDefaultBool = True , inURLList = [ ] )
# TEST Add Supertoken for the all access between robots
- def UACKeyListCheck(inRequest, inRoleKeyList): #Check is client is has access for the key list
- def WebUserInfoGet(inRequest): # Return User info about request Return {"DomainUpperStr":"", "UserNameUpperStr": ""}
- def WebUserUACHierarchyGet(inRequest): # Return User UAC Hierarchy DICT Return {...}
- Scheduler
- Refactoring in gSettings (Scheduler > SchedulerDict)
- def SchedulerActivityTimeAddWeekly(inGSettings, inTimeHHMMStr="23:55:", inWeekdayList=[], inActivityList=[]): # Add activity in time weekly
- Scheduler now listen SchedulerDict
- def ProcessorActivityItemAppend(inGSettings, inDef, inArgList=[], inArgDict={}, inArgGSettingsStr=None, inArgLoggerStr=None): # Add Activity item in Processor list
4 years ago
Orchestrator . UACSuperTokenUpdate ( inGSettings = gSettings , inSuperTokenStr = " 1992-04-03-0643-ru-b4ff-openrpa52zzz " )
# Add first interface!
if CrossOS . IS_WINDOWS_BOOL :
Orchestrator . WebListenCreate ( inGSettings = gSettings , inPortInt = 1024 )
if CrossOS . IS_LINUX_BOOL :
Orchestrator . WebListenCreate ( inGSettings = gSettings , inPortInt = 1024 )
# Restore DUMP
Orchestrator . OrchestratorSessionRestore ( inGSettings = gSettings )
# Autoinit control panels starts with CP_
lPyModules = Orchestrator . OrchestratorPySearchInit ( inGlobPatternStr = " ControlPanel \\ CP_*.py " , inDefStr = " SettingsUpdate " , inDefArgNameGSettingsStr = " inGSettings " , inAsyncInitBool = True )
lCPManager = Orchestrator . Managers . ControlPanel ( inControlPanelNameStr = " HR_officer01ControlPanel " ,
inRefreshHTMLJinja2TemplatePathStr = CrossOS . PathStr ( " D: \\ For work \\ RPA \\ OpenRPA \\ Orchestrator \\ Demo \\ HR_officer01 \\ index.html " ) , inJinja2TemplateRefreshBool = True )
lCPManager = Orchestrator . Managers . ControlPanel ( inControlPanelNameStr = " Treasurer01ControlPanel " ,
inRefreshHTMLJinja2TemplatePathStr = CrossOS . PathStr ( " D: \\ For work \\ RPA \\ OpenRPA \\ Orchestrator \\ Demo \\ Treasurer01 \\ index.html " ) , inJinja2TemplateRefreshBool = True )
# Call the orchestrator def
Orchestrator . Orchestrator ( inGSettings = gSettings , inDumpRestoreBool = False )