@ -1,7 +1,7 @@
import json , os
import copy
from . import __Orchestrator__
from . Server import app # FAST API SERVER
from . Server import app , IdentifyAuthorize # FAST API SERVER
#ControlPanelDict
from pyOpenRPA . Tools import CrossOS
if CrossOS . IS_WINDOWS_BOOL : #CrossOS
@ -30,76 +30,6 @@ import io
from starlette . responses import StreamingResponse
from typing import Union
from fastapi . responses import JSONResponse
import base64
import uuid
import datetime
def IdentifyAuthorize ( inRequest : Request , inResponse : Response ,
inCookiesStr : Union [ str , None ] = Header ( default = None , alias = " Cookie " ) ,
inAuthorizationStr : Union [ str , None ] = Header ( default = " " , alias = " Authorization " ) ) :
if __Orchestrator__ . GSettingsGet ( ) . get ( " ServerDict " , { } ) . get ( " AccessUsers " , { } ) . get ( " FlagCredentialsAsk " , False ) :
lResult = { " Domain " : " " , " User " : " " }
#print("IdentifyAuthorize")
######################################
#Way 1 - try to find AuthToken
lCookies = cookies . SimpleCookie ( inCookiesStr ) # inRequest.headers.get("Cookie", "")
__Orchestrator__ . GSettingsGet ( )
lHeaderAuthorization = inAuthorizationStr . split ( " " )
if " AuthToken " in lCookies :
lCookieAuthToken = lCookies . get ( " AuthToken " , " " ) . value
if lCookieAuthToken :
#Find AuthToken in GlobalDict
if lCookieAuthToken in __Orchestrator__ . GSettingsGet ( ) . get ( " ServerDict " , { } ) . get ( " AccessUsers " , { } ) . get ( " AuthTokensDict " , { } ) :
#Auth Token Has Been Founded
lResult [ " Domain " ] = __Orchestrator__ . GSettingsGet ( ) [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] [ lCookieAuthToken ] [ " Domain " ]
lResult [ " User " ] = __Orchestrator__ . GSettingsGet ( ) [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] [ lCookieAuthToken ] [ " User " ]
#Set auth token
mOpenRPA = { }
mOpenRPA [ " AuthToken " ] = lCookieAuthToken
mOpenRPA [ " Domain " ] = lResult [ " Domain " ]
mOpenRPA [ " User " ] = lResult [ " User " ]
mOpenRPA [ " IsSuperToken " ] = __Orchestrator__ . GSettingsGet ( ) . get ( " ServerDict " , { } ) . get ( " AccessUsers " , { } ) . get ( " AuthTokensDict " , { } ) . get ( mOpenRPA [ " AuthToken " ] , { } ) . get ( " FlagDoNotExpire " , False )
return lAuthToken
######################################
#Way 2 - try to logon
if len ( lHeaderAuthorization ) == 2 :
llHeaderAuthorizationDecodedUserPasswordList = base64 . b64decode ( lHeaderAuthorization [ 1 ] ) . decode ( " utf-8 " ) . split (
" : " )
lUser = llHeaderAuthorizationDecodedUserPasswordList [ 0 ]
lPassword = llHeaderAuthorizationDecodedUserPasswordList [ 1 ]
lDomain = " "
if " \\ " in lUser :
lDomain = lUser . split ( " \\ " ) [ 0 ]
lUser = lUser . split ( " \\ " ) [ 1 ]
lLogonBool = __Orchestrator__ . OSCredentialsVerify ( inUserStr = lUser , inPasswordStr = lPassword , inDomainStr = lDomain )
#Check result
if lLogonBool :
lResult [ " Domain " ] = lDomain
lResult [ " User " ] = lUser
#Create token
lAuthToken = str ( uuid . uuid1 ( ) )
__Orchestrator__ . GSettingsGet ( ) [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] [ lAuthToken ] = { }
__Orchestrator__ . GSettingsGet ( ) [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] [ lAuthToken ] [ " Domain " ] = lResult [ " Domain " ]
__Orchestrator__ . GSettingsGet ( ) [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] [ lAuthToken ] [ " User " ] = lResult [ " User " ]
__Orchestrator__ . GSettingsGet ( ) [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] [ lAuthToken ] [ " FlagDoNotExpire " ] = False
__Orchestrator__ . GSettingsGet ( ) [ " ServerDict " ] [ " AccessUsers " ] [ " AuthTokensDict " ] [ lAuthToken ] [ " TokenDatetime " ] = datetime . datetime . now ( )
#Set-cookie
inResponse . set_cookie ( key = " AuthToken " , value = lAuthToken )
mOpenRPA = { }
mOpenRPA [ " AuthToken " ] = lAuthToken
mOpenRPA [ " Domain " ] = lResult [ " Domain " ]
mOpenRPA [ " User " ] = lResult [ " User " ]
mOpenRPA [ " IsSuperToken " ] = __Orchestrator__ . GSettingsGet ( ) . get ( " ServerDict " , { } ) . get ( " AccessUsers " , { } ) . get ( " AuthTokensDict " , { } ) . get ( mOpenRPA [ " AuthToken " ] , { } ) . get ( " FlagDoNotExpire " , False )
return lAuthToken
#inRequest.OpenRPASetCookie = {}
#New engine of server
#inRequest.OpenRPAResponseDict["SetCookies"]["AuthToken"] = lAuthToken
else :
raise HTTPException ( status_code = 401 , detail = " Попытка авторизации не прошла успешно (неверная пара логин / пароль) " , headers = { } )
######################################
else :
raise HTTPException ( status_code = 401 , detail = " Попытка авторизации не прошла успешно (неполная пара логин / пароль) " , headers = { ' Content-type ' : ' text/html ' , ' WWW-Authenticate ' : ' Basic ' } )
else : return None # Credentials are not required - return none
# # # # # # # # # # # #
@ -339,7 +269,7 @@ def pyOpenRPA_Processor(inRequest:Request, inAuthTokenStr:str = Depends(Identify
except Exception as e :
lActivityTypeListStr = " Ошибка чтения типа активности "
lHostStr = __Orchestrator__ . WebRequestHostGet ( inRequest = inRequest )
lWebAuditMessageStr = __Orchestrator__ . WebAuditMessageCreate ( in Request= inRequest , inHostStr = lHostStr , inOperationCodeStr = lActivityTypeListStr , inMessageStr = " pyOpenRPA_Processor " )
lWebAuditMessageStr = __Orchestrator__ . WebAuditMessageCreate ( in AuthTokenStr= inAuthTokenStr , inHostStr = lHostStr , inOperationCodeStr = lActivityTypeListStr , inMessageStr = " pyOpenRPA_Processor " )
if lL : lL . info ( lWebAuditMessageStr )
if lInput . get ( " ThreadBool " , False ) == False :
# Append in list
@ -356,7 +286,7 @@ def pyOpenRPA_ActivityListExecute(inRequest:Request, inAuthTokenStr:str = Depend
# Recieve the data
inGSettings = __Orchestrator__ . GSettingsGet ( )
lL = __Orchestrator__ . OrchestratorLoggerGet ( )
lValueStr = in Request. body
lValueStr = in BodyStr
# Превращение массива байт в объект
lInput = json . loads ( lValueStr )
# If list - operator plus
@ -370,7 +300,7 @@ def pyOpenRPA_ActivityListExecute(inRequest:Request, inAuthTokenStr:str = Depend
except Exception as e :
lActivityTypeListStr = " Ошибка чтения типа активности "
lHostStr = __Orchestrator__ . WebRequestHostGet ( inRequest = inRequest )
lWebAuditMessageStr = __Orchestrator__ . WebAuditMessageCreate ( in Request= inRequest , inHostStr = lHostStr , inOperationCodeStr = lActivityTypeListStr , inMessageStr = " pyOpenRPA_ActivityListExecute " )
lWebAuditMessageStr = __Orchestrator__ . WebAuditMessageCreate ( in AuthTokenStr= inAuthTokenStr , inHostStr = lHostStr , inOperationCodeStr = lActivityTypeListStr , inMessageStr = " pyOpenRPA_ActivityListExecute " )
if lL : lL . info ( lWebAuditMessageStr )
# Execution
lResultList = Processor . ActivityListExecute ( inGSettings = inGSettings , inActivityList = lInput )
@ -385,7 +315,7 @@ def pyOpenRPA_ActivityListExecute(inRequest:Request, inAuthTokenStr:str = Depend
except Exception as e :
lActivityTypeListStr = " Ошибка чтения типа активности "
lHostStr = __Orchestrator__ . WebRequestHostGet ( inRequest = inRequest )
lWebAuditMessageStr = __Orchestrator__ . WebAuditMessageCreate ( in Request= inRequest , inHostStr = lHostStr ,
lWebAuditMessageStr = __Orchestrator__ . WebAuditMessageCreate ( in AuthTokenStr= inAuthTokenStr , inHostStr = lHostStr ,
inOperationCodeStr = lActivityTypeListStr ,
inMessageStr = " pyOpenRPA_ActivityListExecute " )
if lL : lL . info ( lWebAuditMessageStr )
@ -584,7 +514,7 @@ def SettingsUpdate():
#{"Method": "POST", "URL": "/orpa/api/processor-queue-add", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_Processor, "ResponseContentType": "application/json"},
#{"Method": "POST", "URL": "/orpa/api/activity-list-execute", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ActivityListExecute, "ResponseContentType": "application/json"},
{ " Method " : " GET " , " URL " : " /orpa/api/helper-def-list/ " , " MatchType " : " BeginWith " , " ResponseDefRequestGlobal " : pyOpenRPA_Debugging_HelperDefList , " ResponseContentType " : " application/json " } ,
{ " Method " : " GET " , " URL " : " /orpa/api/helper- autofill/" , " MatchType " : " BeginWith " , " ResponseDefRequestGlobal " : pyOpenRPA_Debugging_HelperDefAutofill , " ResponseContentType " : " application/json " } ,
{ " Method " : " GET " , " URL " : " /orpa/api/helper- def- autofill/" , " MatchType " : " BeginWith " , " ResponseDefRequestGlobal " : pyOpenRPA_Debugging_HelperDefAutofill , " ResponseContentType " : " application/json " } ,
# AGENT
{ " Method " : " POST " , " URL " : " /orpa/agent/o2a " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_Agent_O2A , " ResponseContentType " : " application/json " } ,
{ " Method " : " POST " , " URL " : " /orpa/agent/a2o " , " MatchType " : " Equal " , " ResponseDefRequestGlobal " : pyOpenRPA_Agent_A2O , " ResponseContentType " : " application/json " }