|
|
|
@ -1,5 +1,6 @@
|
|
|
|
|
from re import I
|
|
|
|
|
import subprocess, json, psutil, time, os
|
|
|
|
|
import typing
|
|
|
|
|
from pyOpenRPA.Tools import CrossOS
|
|
|
|
|
if CrossOS.IS_WINDOWS_BOOL: import win32security #CrossOS
|
|
|
|
|
if CrossOS.IS_LINUX_BOOL: from simplepam import authenticate #CrossOS
|
|
|
|
@ -42,7 +43,9 @@ import math
|
|
|
|
|
import glob # search the files
|
|
|
|
|
import urllib
|
|
|
|
|
|
|
|
|
|
from . import ServerSettings
|
|
|
|
|
|
|
|
|
|
from fastapi import FastAPI, Depends
|
|
|
|
|
|
|
|
|
|
#Единый глобальный словарь (За основу взять из Settings.py)
|
|
|
|
|
gSettingsDict = None
|
|
|
|
@ -1053,6 +1056,101 @@ def GSettingsKeyListValueOperatorPlus(inValue, inKeyList=None, inGSettings = Non
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
|
# OrchestratorWeb defs
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
|
def WebUserLoginGet(inAuthTokenStr: str=None) -> str:
|
|
|
|
|
"""L+,W+: Получить логин авторизованного пользователя. Если авторизация не производилась - вернуть None
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: Логин пользователя
|
|
|
|
|
:rtype: str
|
|
|
|
|
"""
|
|
|
|
|
if inAuthTokenStr is None: return None
|
|
|
|
|
inGS = GSettingsGet() # Get the global settings
|
|
|
|
|
return inGS.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("User", None)
|
|
|
|
|
|
|
|
|
|
def WebUserDomainGet(inAuthTokenStr: str=None) -> str:
|
|
|
|
|
"""L+,W+: Получить домен авторизованного пользователя. Если авторизация не производилась - вернуть None
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: Домен пользователя
|
|
|
|
|
:rtype: str
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if inAuthTokenStr is None: return None
|
|
|
|
|
inGS = GSettingsGet() # Get the global settings
|
|
|
|
|
return inGS.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("Domain", None)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def WebUserInfoGet(inAuthTokenStr=None):
|
|
|
|
|
"""L+,W+: Информация о пользователе, который отправил HTTP запрос.
|
|
|
|
|
|
|
|
|
|
:param inRequest: Экземпляр HTTP request. Опционален, если сообщение фиксируется из под потока, который был инициирован запросом пользователя
|
|
|
|
|
:return: Сведения в формате {"DomainUpperStr": "PYOPENRPA", "UserNameUpperStr": "IVAN.MASLOV"}
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
lResultDict = {
|
|
|
|
|
"DomainUpperStr": WebUserDomainGet(inAuthTokenStr=inAuthTokenStr).upper(),
|
|
|
|
|
"UserNameUpperStr": WebUserLoginGet(inAuthTokenStr=inAuthTokenStr).upper()
|
|
|
|
|
}
|
|
|
|
|
return lResultDict
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {"DomainUpperStr": None, "UserNameUpperStr": None}
|
|
|
|
|
|
|
|
|
|
def WebUserIsSuperToken(inAuthTokenStr: str=None):
|
|
|
|
|
"""L+,W+: [ИЗМЕНЕНИЕ В 1.3.1] Проверить, авторизован ли HTTP запрос с помощью супер токена (токен, который не истекает).
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: True - является супертокеном; False - не является супертокеном; None - авторизация не производилась
|
|
|
|
|
"""
|
|
|
|
|
if inAuthTokenStr is None: return None
|
|
|
|
|
inGSettings = GSettingsGet() # Get the global settings
|
|
|
|
|
lIsSuperTokenBool = False
|
|
|
|
|
# Get Flag is supertoken (True|False)
|
|
|
|
|
lIsSuperTokenBool = inGSettings.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("FlagDoNotExpire", False)
|
|
|
|
|
return lIsSuperTokenBool
|
|
|
|
|
|
|
|
|
|
def WebUserUACHierarchyGet(inAuthTokenStr: str=None) -> dict:
|
|
|
|
|
"""L+,W+: [ИЗМЕНЕНИЕ В 1.3.1] Вернуть словарь доступа UAC в отношении пользователя, который выполнил HTTP запрос inRequest
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: UAC словарь доступа или {}, что означает полный доступ
|
|
|
|
|
"""
|
|
|
|
|
lDomainUpperStr = WebUserDomainGet(inAuthTokenStr=inAuthTokenStr).upper()
|
|
|
|
|
lUserUpperStr = WebUserLoginGet(inAuthTokenStr=inAuthTokenStr).upper()
|
|
|
|
|
if lUserUpperStr is None: return {}
|
|
|
|
|
else: return GSettingsGet().get("ServerDict", {}).get("AccessUsers", {}).get("RuleDomainUserDict", {}).get((lDomainUpperStr, lUserUpperStr), {}).get("RoleHierarchyAllowedDict", {})
|
|
|
|
|
|
|
|
|
|
def WebUserUACCheck(inAuthTokenStr:str=None, inKeyList:list=None) -> bool:
|
|
|
|
|
"""L+,W+: Проверить UAC доступ списка ключей для пользователя
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: True - доступ имеется, False - доступа нет
|
|
|
|
|
:rtype: bool
|
|
|
|
|
"""
|
|
|
|
|
if inAuthTokenStr is None: return True # Если авторизации не происходило - супердоступ
|
|
|
|
|
lResult = True # Init flag
|
|
|
|
|
lRoleHierarchyDict = WebUserUACHierarchyGet(inAuthTokenStr=inAuthTokenStr) # get the Hierarchy
|
|
|
|
|
# Try to get value from key list
|
|
|
|
|
lKeyValue = lRoleHierarchyDict # Init the base
|
|
|
|
|
for lItem in inKeyList:
|
|
|
|
|
if type(lKeyValue) is dict:
|
|
|
|
|
if lItem in lKeyValue: # Has key
|
|
|
|
|
lKeyValue = lKeyValue[lItem] # Get the value and go to the next loop iteration
|
|
|
|
|
else: # Else branch - true or false
|
|
|
|
|
if len(lKeyValue)>0: # False - if Dict has some elements
|
|
|
|
|
lResult = False # Set the False Flag
|
|
|
|
|
else:
|
|
|
|
|
lResult = True # Set the True flag
|
|
|
|
|
break # Stop the loop
|
|
|
|
|
else: # Has element with no detalization - return True
|
|
|
|
|
lResult = True # Set the flag
|
|
|
|
|
break # Close the loop
|
|
|
|
|
return lResult # Return the result
|
|
|
|
|
|
|
|
|
|
def WebURLIndexChange(inURLIndexStr:str ="/"):
|
|
|
|
|
"""L+,W+: Изменить адрес главной страницы Оркестратора. По умолчанию '/'
|
|
|
|
|
|
|
|
|
@ -1315,107 +1413,31 @@ def WebRequestGet():
|
|
|
|
|
lCurrentThread = threading.current_thread()
|
|
|
|
|
if hasattr(lCurrentThread, "request"):
|
|
|
|
|
return lCurrentThread.request
|
|
|
|
|
from fastapi import FastAPI
|
|
|
|
|
|
|
|
|
|
def WebAppGet() -> FastAPI:
|
|
|
|
|
"""L+,W+: Вернуть экземпляр веб сервера fastapi.FastAPI (app). Подробнее про app см. https://fastapi.tiangolo.com/tutorial/first-steps/
|
|
|
|
|
"""
|
|
|
|
|
return Server.app
|
|
|
|
|
def WebUserLoginGet(inAuthTokenStr: str=None) -> str:
|
|
|
|
|
"""L+,W+: Получить логин авторизованного пользователя. Если авторизация не производилась - вернуть None
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: Логин пользователя
|
|
|
|
|
:rtype: str
|
|
|
|
|
"""
|
|
|
|
|
if inAuthTokenStr is None: return None
|
|
|
|
|
inGS = GSettingsGet() # Get the global settings
|
|
|
|
|
return inGS.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("User", None)
|
|
|
|
|
|
|
|
|
|
def WebUserDomainGet(inAuthTokenStr: str=None) -> str:
|
|
|
|
|
"""L+,W+: Получить домен авторизованного пользователя. Если авторизация не производилась - вернуть None
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: Домен пользователя
|
|
|
|
|
:rtype: str
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if inAuthTokenStr is None: return None
|
|
|
|
|
inGS = GSettingsGet() # Get the global settings
|
|
|
|
|
return inGS.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("Domain", None)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def WebUserInfoGet(inAuthTokenStr=None):
|
|
|
|
|
"""L+,W+: Информация о пользователе, который отправил HTTP запрос.
|
|
|
|
|
|
|
|
|
|
:param inRequest: Экземпляр HTTP request. Опционален, если сообщение фиксируется из под потока, который был инициирован запросом пользователя
|
|
|
|
|
:return: Сведения в формате {"DomainUpperStr": "PYOPENRPA", "UserNameUpperStr": "IVAN.MASLOV"}
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
lResultDict = {
|
|
|
|
|
"DomainUpperStr": WebUserDomainGet(inAuthTokenStr=inAuthTokenStr).upper(),
|
|
|
|
|
"UserNameUpperStr": WebUserLoginGet(inAuthTokenStr=inAuthTokenStr).upper()
|
|
|
|
|
}
|
|
|
|
|
return lResultDict
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {"DomainUpperStr": None, "UserNameUpperStr": None}
|
|
|
|
|
def WebAuthDefGet():
|
|
|
|
|
"""Вернуть функцию авторизации пользователя. Функция может использоваться для доменной авторизации.
|
|
|
|
|
|
|
|
|
|
def WebUserIsSuperToken(inAuthTokenStr: str=None):
|
|
|
|
|
"""L+,W+: [ИЗМЕНЕНИЕ В 1.3.1] Проверить, авторизован ли HTTP запрос с помощью супер токена (токен, который не истекает).
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: True - является супертокеном; False - не является супертокеном; None - авторизация не производилась
|
|
|
|
|
"""
|
|
|
|
|
if inAuthTokenStr is None: return None
|
|
|
|
|
inGSettings = GSettingsGet() # Get the global settings
|
|
|
|
|
lIsSuperTokenBool = False
|
|
|
|
|
# Get Flag is supertoken (True|False)
|
|
|
|
|
lIsSuperTokenBool = inGSettings.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("FlagDoNotExpire", False)
|
|
|
|
|
return lIsSuperTokenBool
|
|
|
|
|
|
|
|
|
|
def WebUserUACHierarchyGet(inAuthTokenStr: str=None) -> dict:
|
|
|
|
|
"""L+,W+: [ИЗМЕНЕНИЕ В 1.3.1] Вернуть словарь доступа UAC в отношении пользователя, который выполнил HTTP запрос inRequest
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: UAC словарь доступа или {}, что означает полный доступ
|
|
|
|
|
"""
|
|
|
|
|
lDomainUpperStr = WebUserDomainGet(inAuthTokenStr=inAuthTokenStr).upper()
|
|
|
|
|
lUserUpperStr = WebUserLoginGet(inAuthTokenStr=inAuthTokenStr).upper()
|
|
|
|
|
if lUserUpperStr is None: return {}
|
|
|
|
|
else: return GSettingsGet().get("ServerDict", {}).get("AccessUsers", {}).get("RuleDomainUserDict", {}).get((lDomainUpperStr, lUserUpperStr), {}).get("RoleHierarchyAllowedDict", {})
|
|
|
|
|
|
|
|
|
|
def WebUserUACCheck(inAuthTokenStr:str=None, inKeyList:list=None) -> bool:
|
|
|
|
|
"""L+,W+: Проверить UAC доступ списка ключей для пользователя
|
|
|
|
|
|
|
|
|
|
:param inAuthTokenStr: Токен авторизации пользователя / бота, по умолчанию None (не установлен)
|
|
|
|
|
:type inAuthTokenStr: str, опционально
|
|
|
|
|
:return: True - доступ имеется, False - доступа нет
|
|
|
|
|
:rtype: bool
|
|
|
|
|
# ПРИМЕР Если требуется авторизация пользователя (получить inAuthTokenStr)
|
|
|
|
|
from fastapi import Request
|
|
|
|
|
from fastapi.responses import JSONResponse, HTMLResponse
|
|
|
|
|
from pyOpenRPA import Orchestrator
|
|
|
|
|
@app.post("/url/to/def",response_class=JSONResponse)
|
|
|
|
|
async def some_def(inRequest:Request, inAuthTokenStr:str=Depends(Orchestrator.WebAuthDefGet())):
|
|
|
|
|
l_input_dict = await inRequest.json()
|
|
|
|
|
if lValueStr == None or lValueStr == b"": lValueStr=""
|
|
|
|
|
else: lValueStr = lValueStr.decode("utf8")
|
|
|
|
|
|
|
|
|
|
:return: Функция авторизации
|
|
|
|
|
:rtype: def
|
|
|
|
|
"""
|
|
|
|
|
if inAuthTokenStr is None: return True # Если авторизации не происходило - супердоступ
|
|
|
|
|
lResult = True # Init flag
|
|
|
|
|
lRoleHierarchyDict = WebUserUACHierarchyGet(inAuthTokenStr=inAuthTokenStr) # get the Hierarchy
|
|
|
|
|
# Try to get value from key list
|
|
|
|
|
lKeyValue = lRoleHierarchyDict # Init the base
|
|
|
|
|
for lItem in inKeyList:
|
|
|
|
|
if type(lKeyValue) is dict:
|
|
|
|
|
if lItem in lKeyValue: # Has key
|
|
|
|
|
lKeyValue = lKeyValue[lItem] # Get the value and go to the next loop iteration
|
|
|
|
|
else: # Else branch - true or false
|
|
|
|
|
if len(lKeyValue)>0: # False - if Dict has some elements
|
|
|
|
|
lResult = False # Set the False Flag
|
|
|
|
|
else:
|
|
|
|
|
lResult = True # Set the True flag
|
|
|
|
|
break # Stop the loop
|
|
|
|
|
else: # Has element with no detalization - return True
|
|
|
|
|
lResult = True # Set the flag
|
|
|
|
|
break # Close the loop
|
|
|
|
|
return lResult # Return the result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return ServerSettings.IdentifyAuthorize
|
|
|
|
|
|
|
|
|
|
def StorageRobotExists(inRobotNameStr):
|
|
|
|
|
"""L+,W+: Проверить, существует ли ключ inRobotNameStr в хранилище пользовательской информации StorageDict (GSettings > StarageDict)
|
|
|
|
|