4. Как использовать?¶
Как запустить?¶
Хотите выполнить запуск Оркестратора?
Для этого достаточно (выбрать одно):
запустить демо-стэнд: запустить .cmd файл, расположенный в папке pyOpenRPA по адресу: Orchestratorstart.cmd (для Windows) и start.sh (для Linux). Далее перейти в браузер по адресу: http://localhost:1024
в свой .py скрипт добавить следующий код (см. ниже)
if __name__ == "__main__": # New init way - allow run as module -m PyOpenRPA.Orchestrator
from pyOpenRPA import Orchestrator # Import orchestrator main
gSettings = SettingsTemplate.Create(inModeStr="BASIC") # Create GSettings with basic configuration - no more config is available from the box - you can create own
# Call the orchestrator main def
Orchestrator.Orchestrator(inGSettings=gSettings)
Шаблоны функций веб-сервера (с использованием FastAPI)¶
# ПРИМЕР Если НЕ требуется авторизация пользователя (получить inAuthTokenStr)
from fastapi import Request
from fastapi.responses import JSONResponse, HTMLResponse
@app.post("/url/to/def",response_class=JSONResponse)
async def some_def(inRequest:Request):
l_input_dict = await inRequest.json()
if lValueStr == None or lValueStr == b"": lValueStr=""
else: lValueStr = lValueStr.decode("utf8")
# ПРИМЕР Если требуется авторизация пользователя (получить inAuthTokenStr)
from fastapi import Request
from fastapi.responses import JSONResponse, HTMLResponse
from pyOpenRPA import Orchestrator
@app.post("/url/to/def",response_class=JSONResponse)
async def some_def(inRequest:Request, inAuthTokenStr:str=Depends(Orchestrator.WebAuthDefGet())):
l_input_dict = await inRequest.json()
if lValueStr == None or lValueStr == b"": lValueStr=""
else: lValueStr = lValueStr.decode("utf8")
Конфигурационный файл config.py¶
Также вы можете выполнить более тонкую настройку параметров Оркестратора. Ниже пример такой настройки:
import psutil, datetime, logging, os, sys # Config settings lPyOpenRPASourceFolderPathStr = (r"../Sources") # Path for test pyOpenRPA package # Operations if lPyOpenRPASourceFolderPathStr != "": sys.path.insert(0,os.path.abspath(os.path.join(lPyOpenRPASourceFolderPathStr))) # Path for test pyOpenRPA package # Start import after config the pyOpenRPA folder from pyOpenRPA.Orchestrator import SettingsTemplate # Import functionallity from pyOpenRPA.Tools import CrossOS from pyOpenRPA import Orchestrator # Import orchestrator main from pyOpenRPA.Orchestrator.Server import app import threading from fastapi import Depends from fastapi.responses import PlainTextResponse from fastapi.responses import FileResponse # Пример создания функции на сервере (FASTAPI) /test/threads @app.get(path="/test/threads",tags=["Test"],response_class=PlainTextResponse) def Threads():# inAuthDict:dict=Depends(IdentifyAuthorize) #def Threads(inAuthDict:dict=Depends(IdentifyAuthorize)):# Используй, если требуется авторизация lThreadStr = "" for thread in threading.enumerate(): lThreadStr+=f"ПОТОК: {thread.name}\n" #print(thread.name) return lThreadStr #Run as administrator (ONLY FOR WINDOWS) if not Orchestrator.OrchestratorIsAdmin() and CrossOS.IS_WINDOWS_BOOL: Orchestrator.OrchestratorRerunAsAdmin() print(f"Orchestrator will be run as administrator!") else: gSettings = Orchestrator.GSettingsGet() Orchestrator.OrchestratorLoggerGet().setLevel(logging.INFO) #lUACClientDict = SettingsTemplate.__UACClientAdminCreate__() Orchestrator.OrchestratorSetCredentialsAsk(inCredentialAskBool=False) #Orchestrator.UACUpdate(inGSettings=gSettings, inADLoginStr="rpa00", inADStr="", inADIsDefaultBool=True, inURLList=[], inRoleHierarchyAllowedDict=lUACClientDict) # TEST Add Supertoken for the all access between robots #Orchestrator.UACSuperTokenUpdate(inGSettings=gSettings, inSuperTokenStr="1992-04-03-0643-ru-b4ff-openrpa52zzz") # Add first interface! if CrossOS.IS_WINDOWS_BOOL: Orchestrator.WebListenCreate(inGSettings=gSettings, inPortInt=1024) if CrossOS.IS_LINUX_BOOL: Orchestrator.WebListenCreate(inGSettings=gSettings, inPortInt=1024) # Restore DUMP Orchestrator.OrchestratorSessionRestore(inGSettings=gSettings) # Autoinit control panels starts with CP_ lPyModules = Orchestrator.OrchestratorPySearchInit(inGlobPatternStr="Demo\\*\\config.py", inAsyncInitBool=True, inPackageLevelInt=1) #lPyModules2 = Orchestrator.OrchestratorPySearchInit(inGlobPatternStr="..\\..\\KPI_Effect\\packages\\*_control_panel\\config.py", inAsyncInitBool=True, inPackageLevelInt=1) # Call the orchestrator def Orchestrator.Orchestrator(inGSettings=gSettings, inDumpRestoreBool=False)