You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
68 lines
3.2 KiB
68 lines
3.2 KiB
import psutil, datetime, logging, os, sys
|
|
|
|
|
|
# Config settings
|
|
lPyOpenRPASourceFolderPathStr = (r"../Sources") # Path for test pyOpenRPA package
|
|
|
|
# Operations
|
|
if lPyOpenRPASourceFolderPathStr != "": sys.path.insert(0,os.path.abspath(os.path.join(lPyOpenRPASourceFolderPathStr))) # Path for test pyOpenRPA package
|
|
|
|
|
|
# Start import after config the pyOpenRPA folder
|
|
from pyOpenRPA.Orchestrator import SettingsTemplate # Import functionallity
|
|
from pyOpenRPA.Tools import CrossOS
|
|
from pyOpenRPA import Orchestrator # Import orchestrator main
|
|
from pyOpenRPA.Orchestrator.Server import app
|
|
import threading
|
|
|
|
from fastapi import Depends
|
|
from fastapi.responses import PlainTextResponse
|
|
from fastapi.responses import FileResponse
|
|
|
|
|
|
|
|
|
|
|
|
# Пример создания функции на сервере (FASTAPI) /test/threads
|
|
@app.get(path="/test/threads",tags=["Test"],response_class=PlainTextResponse)
|
|
def Threads():# inAuthDict:dict=Depends(IdentifyAuthorize)
|
|
#def Threads(inAuthDict:dict=Depends(IdentifyAuthorize)):# Используй, если требуется авторизация
|
|
lThreadStr = ""
|
|
for thread in threading.enumerate():
|
|
lThreadStr+=f"ПОТОК: {thread.name}\n"
|
|
#print(thread.name)
|
|
return lThreadStr
|
|
|
|
|
|
#Run as administrator (ONLY FOR WINDOWS)
|
|
if not Orchestrator.OrchestratorIsAdmin() and CrossOS.IS_WINDOWS_BOOL:
|
|
Orchestrator.OrchestratorRerunAsAdmin()
|
|
print(f"Orchestrator will be run as administrator!")
|
|
else:
|
|
gSettings = Orchestrator.GSettingsGet()
|
|
#gSettings = SettingsTemplate.Create(inModeStr="BASIC") # Create GSettings with basic configuration - no more config is available from the box - you can create own
|
|
Orchestrator.OrchestratorLoggerGet().setLevel(logging.INFO)
|
|
# TEST Add User ND - Add Login ND to superuser of the Orchestrator
|
|
lUACClientDict = SettingsTemplate.__UACClientAdminCreate__()
|
|
gSettings["ServerDict"]["AccessUsers"]["FlagCredentialsAsk"]=False
|
|
Orchestrator.UACUpdate(inGSettings=gSettings, inADLoginStr="ND", inADStr="", inADIsDefaultBool=True, inURLList=[], inRoleHierarchyAllowedDict=lUACClientDict)
|
|
Orchestrator.UACUpdate(inGSettings=gSettings, inADLoginStr="rpa00", inADStr="", inADIsDefaultBool=True, inURLList=[], inRoleHierarchyAllowedDict=lUACClientDict)
|
|
# TEST Add User IMaslov - Add Login IMaslov to superuser of the Orchestrator
|
|
Orchestrator.UACUpdate(inGSettings=gSettings, inADLoginStr="VLADICK", inADStr="", inADIsDefaultBool=True, inURLList=[])
|
|
# TEST Add Supertoken for the all access between robots
|
|
Orchestrator.UACSuperTokenUpdate(inGSettings=gSettings, inSuperTokenStr="1992-04-03-0643-ru-b4ff-openrpa52zzz")
|
|
# Add first interface!
|
|
if CrossOS.IS_WINDOWS_BOOL:
|
|
Orchestrator.WebListenCreate(inGSettings=gSettings, inPortInt=1024)
|
|
if CrossOS.IS_LINUX_BOOL:
|
|
Orchestrator.WebListenCreate(inGSettings=gSettings, inPortInt=1024)
|
|
# Restore DUMP
|
|
Orchestrator.OrchestratorSessionRestore(inGSettings=gSettings)
|
|
# Autoinit control panels starts with CP_
|
|
lPyModules = Orchestrator.OrchestratorPySearchInit(inGlobPatternStr="Demo\\*\\config.py", inAsyncInitBool=True)
|
|
# Call the orchestrator def
|
|
Orchestrator.Orchestrator(inGSettings=gSettings, inDumpRestoreBool=False)
|
|
|
|
|
|
|