async long url in progress

dev-fastapi
Ivan Maslov 2 years ago
parent 62594d3db0
commit 7def532500

@ -8,6 +8,7 @@
#from http.client import HTTPException #from http.client import HTTPException
import threading import threading
import typing
from pyOpenRPA.Tools import CrossOS from pyOpenRPA.Tools import CrossOS
@ -129,6 +130,7 @@ from . import ServerSettings
async def BackwardCompatibility(inRequest:Request, inResponse:Response, inAuthTokenStr = None): async def BackwardCompatibility(inRequest:Request, inResponse:Response, inAuthTokenStr = None):
lHTTPRequest = ServerBC.HTTPRequestOld(inRequest=inRequest, inResponse=inResponse, inAuthTokenStr=inAuthTokenStr) lHTTPRequest = ServerBC.HTTPRequestOld(inRequest=inRequest, inResponse=inResponse, inAuthTokenStr=inAuthTokenStr)
lHTTPRequest.path = inRequest.url.path lHTTPRequest.path = inRequest.url.path
#print(f"WEB START: {lHTTPRequest.path}")
inBodyStr = await inRequest.body() inBodyStr = await inRequest.body()
if inBodyStr == None: inBodyStr = "" if inBodyStr == None: inBodyStr = ""
else: inBodyStr = inBodyStr.decode("utf8") else: inBodyStr = inBodyStr.decode("utf8")
@ -139,6 +141,7 @@ async def BackwardCompatibility(inRequest:Request, inResponse:Response, inAuthTo
lResult = lHTTPRequest.do_GET(inBodyStr=inBodyStr) lResult = lHTTPRequest.do_GET(inBodyStr=inBodyStr)
elif inRequest.method=="POST": elif inRequest.method=="POST":
lResult = lHTTPRequest.do_POST(inBodyStr=inBodyStr) lResult = lHTTPRequest.do_POST(inBodyStr=inBodyStr)
#print(f"WEB STOP: {lHTTPRequest.path}")
if lHTTPRequest.OpenRPAResponseDict["Headers"]["Content-type"] != None: if lHTTPRequest.OpenRPAResponseDict["Headers"]["Content-type"] != None:
return StreamingResponse(io.BytesIO(lResult), media_type=lHTTPRequest.OpenRPAResponseDict["Headers"]["Content-type"]) return StreamingResponse(io.BytesIO(lResult), media_type=lHTTPRequest.OpenRPAResponseDict["Headers"]["Content-type"])

@ -53,8 +53,10 @@ def HiddenJSInitGenerate(inRequest, inGSettings):
# Generate CP HTML + JSON # Generate CP HTML + JSON
# Return {"Key":{"",""}} # Return {"Key":{"",""}}
def HiddenCPDictGenerate(inRequest, inGSettings): def HiddenCPDictGenerate(inAuthTokenStr):
dUAC = inRequest.UACClientCheck # Alias. inGSettings = __Orchestrator__.GSettingsGet()
dUAC = lambda inKeyList: __Orchestrator__.WebUserUACCheck(inAuthTokenStr=inAuthTokenStr, inKeyList=inKeyList)
#dUAC = d_uac #inRequest.UACClientCheck # Alias.
lUACCPTemplateKeyList=["pyOpenRPADict","CPKeyDict"] lUACCPTemplateKeyList=["pyOpenRPADict","CPKeyDict"]
lL = inGSettings["Logger"] # Alias for logger lL = inGSettings["Logger"] # Alias for logger
# Create result JSON # Create result JSON
@ -62,15 +64,15 @@ def HiddenCPDictGenerate(inRequest, inGSettings):
lRenderFunctionsRobotDict = inGSettings["ServerDict"]["ControlPanelDict"] lRenderFunctionsRobotDict = inGSettings["ServerDict"]["ControlPanelDict"]
for lItemKeyStr in lRenderFunctionsRobotDict: for lItemKeyStr in lRenderFunctionsRobotDict:
lItemDict = lRenderFunctionsRobotDict[lItemKeyStr] lItemDict = lRenderFunctionsRobotDict[lItemKeyStr]
lUACBool = dUAC(inRoleKeyList=lUACCPTemplateKeyList+[lItemKeyStr]) # Check if render function is applicable User Access Rights (UAC) lUACBool = dUAC(inKeyList=lUACCPTemplateKeyList+[lItemKeyStr]) # Check if render function is applicable User Access Rights (UAC)
if lItemKeyStr=="VersionCheck": lUACBool=True # For backward compatibility for the old fron version which not reload page when new orch version is comming if lItemKeyStr=="VersionCheck": lUACBool=True # For backward compatibility for the old fron version which not reload page when new orch version is comming
if lUACBool: # Run function if UAC is TRUE if lUACBool: # Run function if UAC is TRUE
lCPItemDict = {"HTMLStr": None, "JSONDict":None} lCPItemDict = {"HTMLStr": None, "JSONDict":None}
try: try:
# HTML Render # HTML Render
lCPItemDict["HTMLStr"] = lItemDict.OnRefreshHTMLStr(inRequest=inRequest) lCPItemDict["HTMLStr"] = lItemDict.OnRefreshHTMLStr(inAuthTokenStr=inAuthTokenStr)
# JSONGeneratorDef # JSONGeneratorDef
lCPItemDict["JSONDict"] = lItemDict.OnRefreshJSONDict(inRequest=inRequest) lCPItemDict["JSONDict"] = lItemDict.OnRefreshJSONDict(inAuthTokenStr=inAuthTokenStr)
except Exception as e: except Exception as e:
lL.exception(f"EXCEPTION WHEN HTML/ JSON RENDER") lL.exception(f"EXCEPTION WHEN HTML/ JSON RENDER")
# Insert CPItemDict in result CPDict # Insert CPItemDict in result CPDict
@ -78,14 +80,15 @@ def HiddenCPDictGenerate(inRequest, inGSettings):
return lCPDict return lCPDict
# Return {"Key":{"",""}} # Return {"Key":{"",""}}
def HiddenRDPDictGenerate(inRequest, inGSettings): def HiddenRDPDictGenerate(inAuthTokenStr):
dUAC = inRequest.UACClientCheck # Alias. inGSettings = __Orchestrator__.GSettingsGet()
dUAC = lambda inKeyList: __Orchestrator__.WebUserUACCheck(inAuthTokenStr=inAuthTokenStr, inKeyList=inKeyList)
lUACRDPTemplateKeyList=["pyOpenRPADict","RDPKeyDict"] lUACRDPTemplateKeyList=["pyOpenRPADict","RDPKeyDict"]
lRDPDict = {"HandlebarsList":[]} lRDPDict = {"HandlebarsList":[]}
# Iterate throught the RDP list # Iterate throught the RDP list
for lRDPSessionKeyStrItem in inGSettings["RobotRDPActive"]["RDPList"]: for lRDPSessionKeyStrItem in inGSettings["RobotRDPActive"]["RDPList"]:
# Check UAC # Check UAC
if dUAC(inRoleKeyList=lUACRDPTemplateKeyList+[lRDPSessionKeyStrItem]): if dUAC(inKeyList=lUACRDPTemplateKeyList+[lRDPSessionKeyStrItem]):
lRDPConfiguration = inGSettings["RobotRDPActive"]["RDPList"][ lRDPConfiguration = inGSettings["RobotRDPActive"]["RDPList"][
lRDPSessionKeyStrItem] # Get the configuration dict lRDPSessionKeyStrItem] # Get the configuration dict
lDataItemDict = {"SessionKeyStr": "", "SessionHexStr": "", "IsFullScreenBool": False, lDataItemDict = {"SessionKeyStr": "", "SessionHexStr": "", "IsFullScreenBool": False,
@ -102,15 +105,16 @@ def HiddenRDPDictGenerate(inRequest, inGSettings):
return lRDPDict return lRDPDict
# Return {"HostNameUpperStr;UserUpperStr":{"IsListenBool":True}, "HandlebarsList":[{"HostnameUpperStr":"","UserUpperStr":"","IsListenBool":True}]} # Return {"HostNameUpperStr;UserUpperStr":{"IsListenBool":True}, "HandlebarsList":[{"HostnameUpperStr":"","UserUpperStr":"","IsListenBool":True}]}
def HiddenAgentDictGenerate(inRequest, inGSettings): def HiddenAgentDictGenerate(inAuthTokenStr):
dUAC = inRequest.UACClientCheck # Alias. inGSettings = __Orchestrator__.GSettingsGet()
dUAC = lambda inKeyList: __Orchestrator__.WebUserUACCheck(inAuthTokenStr=inAuthTokenStr, inKeyList=inKeyList)
lUACAgentTemplateKeyList=["pyOpenRPADict","AgentKeyDict"] lUACAgentTemplateKeyList=["pyOpenRPADict","AgentKeyDict"]
lAgentDict = {"HandlebarsList":[]} lAgentDict = {"HandlebarsList":[]}
# Iterate throught the RDP list # Iterate throught the RDP list
for lAgentItemKeyStrItem in inGSettings["AgentDict"]: for lAgentItemKeyStrItem in inGSettings["AgentDict"]:
# Check UAC # Check UAC
lKeyStr = f"{lAgentItemKeyStrItem[0]};{lAgentItemKeyStrItem[1]}" # turple ("HostNameUpperStr","UserUpperStr") > Str "HostNameUpperStr;UserUpperStr" lKeyStr = f"{lAgentItemKeyStrItem[0]};{lAgentItemKeyStrItem[1]}" # turple ("HostNameUpperStr","UserUpperStr") > Str "HostNameUpperStr;UserUpperStr"
if dUAC(inRoleKeyList=lUACAgentTemplateKeyList+[lKeyStr]): if dUAC(inKeyList=lUACAgentTemplateKeyList+[lKeyStr]):
lDataItemDict = inGSettings["AgentDict"][lAgentItemKeyStrItem] lDataItemDict = inGSettings["AgentDict"][lAgentItemKeyStrItem]
lDataItemAgentDict = copy.deepcopy(lDataItemDict) lDataItemAgentDict = copy.deepcopy(lDataItemDict)
lDataItemAgentDict["ActivityList"] = [] lDataItemAgentDict["ActivityList"] = []
@ -128,33 +132,42 @@ def HiddenAgentDictGenerate(inRequest, inGSettings):
# Client: mGlobal.pyOpenRPA.ServerDataHashStr # Client: mGlobal.pyOpenRPA.ServerDataHashStr
# Client: mGlobal.pyOpenRPA.ServerDataDict # Client: mGlobal.pyOpenRPA.ServerDataDict
def pyOpenRPA_ServerData(inRequest,inGSettings): import asyncio
from fastapi import Request
@app.post("/orpa/client/server-data",response_class=JSONResponse)
#{"Method": "POST", "URL": "/orpa/client/server-data", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerData, "ResponseContentType": "application/json"},
async def pyOpenRPA_ServerData(inRequest: Request):
inGSettings = __Orchestrator__.GSettingsGet()
print(f"START pyOpenRPA_ServerData")
# Extract the hash value from request # Extract the hash value from request
lValueStr = inRequest.body lValueStr = await inRequest.body()
lValueStr= lValueStr.decode("utf8")
# Generate ServerDataDict # Generate ServerDataDict
lFlagDoGenerateBool = True lFlagDoGenerateBool = True
while lFlagDoGenerateBool: while lFlagDoGenerateBool:
lServerDataDict = { lServerDataDict = {
"CPDict": HiddenCPDictGenerate(inRequest=inRequest, inGSettings=inGSettings), "CPDict": None,#HiddenCPDictGenerate(inRequest=inRequest, inGSettings=inGSettings),
"RDPDict": HiddenRDPDictGenerate(inRequest=inRequest, inGSettings=inGSettings), "RDPDict": None,#HiddenRDPDictGenerate(inRequest=inRequest, inGSettings=inGSettings),
"AgentDict": HiddenAgentDictGenerate(inRequest=inRequest, inGSettings=inGSettings), "AgentDict": None,#HiddenAgentDictGenerate(inRequest=inRequest, inGSettings=inGSettings),
"UserDict": {"UACClientDict": inRequest.OpenRPA["DefUserRoleHierarchyGet"](), "CWDPathStr": os.getcwd(), "VersionStr": inGSettings["VersionStr"]}, "UserDict": {"UACClientDict": {}, "CWDPathStr": os.getcwd(), "VersionStr": inGSettings["VersionStr"]},
} } # inRequest.OpenRPA["DefUserRoleHierarchyGet"]()
# Create JSON # Create JSON
lServerDataDictJSONStr = json.dumps(lServerDataDict) lServerDataDictJSONStr = json.dumps(lServerDataDict)
# Generate hash # Generate hash
lServerDataHashStr = str(hash(lServerDataDictJSONStr)) lServerDataHashStr = str(hash(lServerDataDictJSONStr))
if lValueStr!=lServerDataHashStr and lServerDataHashStr!= "" and lServerDataHashStr!= None: # Case if Hash is not equal if lValueStr!=lServerDataHashStr and lServerDataHashStr!= "" and lServerDataHashStr!= None: # Case if Hash is not equal
print(f"lValueStr:{lValueStr}, lServerDataHashStr:{lServerDataHashStr}")
lFlagDoGenerateBool = False lFlagDoGenerateBool = False
else: # Case Hashes are equal else: # Case Hashes are equal
time.sleep(inGSettings["Client"]["Session"]["ControlPanelRefreshIntervalSecFloat"]) await asyncio.sleep(inGSettings["Client"]["Session"]["ControlPanelRefreshIntervalSecFloat"])
# Return the result if Hash is changed # Return the result if Hash is changed
lResult = {"HashStr": lServerDataHashStr, "ServerDataDict": lServerDataDict} lResult = {"HashStr": lServerDataHashStr, "ServerDataDict": lServerDataDict}
inResponseDict = inRequest.OpenRPAResponseDict #inResponseDict = inRequest.OpenRPAResponseDict
# Send message back to client # Send message back to client
message = json.dumps(lResult) #message = json.dumps(lResult)
# Write content as utf-8 data # Write content as utf-8 data
inResponseDict["Body"] = bytes(message, "utf8") #inResponseDict["Body"] = bytes(message, "utf8")
print(f"STOP pyOpenRPA_ServerData")
return lResult return lResult
# GET # GET
@ -171,7 +184,7 @@ def pyOpenRPA_ServerJSInit(inRequest,inGSettings):
# /pyOpenRPA/ServerLog return {"HashStr" , "ServerLogList": ["row 1", "row 2"]} # /pyOpenRPA/ServerLog return {"HashStr" , "ServerLogList": ["row 1", "row 2"]}
# Client: mGlobal.pyOpenRPA.ServerLogListHashStr # Client: mGlobal.pyOpenRPA.ServerLogListHashStr
# Client: mGlobal.pyOpenRPA.ServerLogList # Client: mGlobal.pyOpenRPA.ServerLogList
def pyOpenRPA_ServerLog(inRequest,inGSDict): async def pyOpenRPA_ServerLog(inRequest,inGSDict):
# Extract the hash value from request # Extract the hash value from request
lValueStr = inRequest.body lValueStr = inRequest.body
# Generate ServerDataDict # Generate ServerDataDict
@ -183,7 +196,7 @@ def pyOpenRPA_ServerLog(inRequest,inGSDict):
if lValueStr!=lServerLogListHashStr and lServerLogListHashStr!= "" and lServerLogListHashStr!= None: # Case if Hash is not equal Fix because None can be obtained without JSON decode if lValueStr!=lServerLogListHashStr and lServerLogListHashStr!= "" and lServerLogListHashStr!= None: # Case if Hash is not equal Fix because None can be obtained without JSON decode
lFlagDoGenerateBool = False lFlagDoGenerateBool = False
else: # Case Hashes are equal else: # Case Hashes are equal
time.sleep(inGSDict["Client"]["DumpLogListRefreshIntervalSecFloat"]) asyncio.sleep(inGSDict["Client"]["DumpLogListRefreshIntervalSecFloat"])
# Return the result if Hash is changed # Return the result if Hash is changed
lResult = {"HashStr": lServerLogListHashStr, "ServerLogList": lServerLogList} lResult = {"HashStr": lServerLogListHashStr, "ServerLogList": lServerLogList}
inResponseDict = inRequest.OpenRPAResponseDict inResponseDict = inRequest.OpenRPAResponseDict
@ -512,9 +525,9 @@ def SettingsUpdate():
#{"Method": "GET", "URL": "/pyOpenRPA_logo.png", "MatchType": "Equal", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\pyOpenRPA_logo.png"), "ResponseContentType": "image/png", "UACBool":False, "UseCacheBool": True}, #{"Method": "GET", "URL": "/pyOpenRPA_logo.png", "MatchType": "Equal", "ResponseFilePath": os.path.join(lOrchestratorFolder, "..\\Resources\\Web\\pyOpenRPA_logo.png"), "ResponseContentType": "image/png", "UACBool":False, "UseCacheBool": True},
{"Method": "POST", "URL": "/orpa/client/user-role-hierarchy-get", "MatchType": "Equal","ResponseDefRequestGlobal": BackwardCompatibility.v1_2_0_UserRoleHierarchyGet, "ResponseContentType": "application/json"}, {"Method": "POST", "URL": "/orpa/client/user-role-hierarchy-get", "MatchType": "Equal","ResponseDefRequestGlobal": BackwardCompatibility.v1_2_0_UserRoleHierarchyGet, "ResponseContentType": "application/json"},
# New way of the v.1.2.0 functionallity (all defs by the URL from /pyOpenRPA/...) # New way of the v.1.2.0 functionallity (all defs by the URL from /pyOpenRPA/...)
{"Method": "POST", "URL": "/orpa/client/server-data", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerData, "ResponseContentType": "application/json"}, #{"Method": "POST", "URL": "/orpa/client/server-data", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerData, "ResponseContentType": "application/json"},
{"Method": "GET", "URL": "/orpa/client/server-js-init", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerJSInit, "ResponseContentType": "application/javascript"}, {"Method": "GET", "URL": "/orpa/client/server-js-init", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerJSInit, "ResponseContentType": "application/javascript"},
{"Method": "POST", "URL": "/orpa/client/server-log", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerLog, "ResponseContentType": "application/json"}, #{"Method": "POST", "URL": "/orpa/client/server-log", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_ServerLog, "ResponseContentType": "application/json"},
#{"Method": "GET", "URL": "/orpa/client/screenshot-get", "MatchType": "Equal", "ResponseDefRequestGlobal": pyOpenRPA_Screenshot, "ResponseContentType": "image/png"}, #{"Method": "GET", "URL": "/orpa/client/screenshot-get", "MatchType": "Equal", "ResponseDefRequestGlobal": pyOpenRPA_Screenshot, "ResponseContentType": "image/png"},
# API # API
#{"Method": "POST", "URL": "/orpa/api/processor-queue-add", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_Processor, "ResponseContentType": "application/json"}, #{"Method": "POST", "URL": "/orpa/api/processor-queue-add", "MatchType": "Equal","ResponseDefRequestGlobal": pyOpenRPA_Processor, "ResponseContentType": "application/json"},

@ -905,6 +905,151 @@ def UACSuperTokenUpdate(inSuperTokenStr, inGSettings=None):
{inSuperTokenStr:{"User":lLoginStr, "Domain":"", "TokenDatetime": datetime.datetime.now(), "FlagDoNotExpire":True}} {inSuperTokenStr:{"User":lLoginStr, "Domain":"", "TokenDatetime": datetime.datetime.now(), "FlagDoNotExpire":True}}
) )
## GSettings defs
from . import SettingsTemplate
GSettings = SettingsTemplate.Create(inModeStr = "BASIC")
# Modules alias for pyOpenRPA.Orchestrator and pyOpenRPA.Orchestrator.__Orchestrator__
lCurrentModule = sys.modules[__name__]
if __name__ == "pyOpenRPA.Orchestrator" and "pyOpenRPA.Orchestrator.__Orchestrator__" not in sys.modules:
sys.modules["pyOpenRPA.Orchestrator.__Orchestrator__"] = lCurrentModule
if __name__ == "pyOpenRPA.Orchestrator.__Orchestrator__" and "pyOpenRPA.Orchestrator" not in sys.modules:
sys.modules["pyOpenRPA.Orchestrator"] = lCurrentModule
def GSettingsGet(inGSettings=None):
"""L+,W+: Вернуть глобальный словарь настроек Оркестратора. Во время выполнения программы глобальный словарь настроек существует в единственном экземпляре (синглтон)
:param inGSettings: Дополнительный словарь настроек, который необходимо добавить в основной глобальный словарь настроек Оркестратора (синглтон)
:return: Глобальный словарь настроек GSettings
"""
global GSettings # identify the global variable
# Merge dictionaries if some new dictionary has come
if inGSettings is not None and GSettings is not inGSettings:
GSettings = Dictionary.MergeNoException(in1Dict = inGSettings, in2Dict = GSettings)
return GSettings # Return the result
def GSettingsKeyListValueSet(inValue, inKeyList=None, inGSettings = None):
"""L+,W+: Установить значение из глобального словаря настроек Оркестратора GSettings по списку ключей.
Пример: Для того, чтобы установить значение для ключа car в словаре {"complex":{"car":"green"}, "simple":"HELLO"}, необходимо передать список ключей: ["complex", "car"]
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inValue: Значение для установки в глобальный словарь настроек Оркестратора GSettings
:param inKeyList: Список ключей, по адресу которого установить значение в GSettings
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
lDict[inKeyList[-1]] = inValue #Set value
return True
def GSettingsKeyListValueGet(inKeyList=None, inGSettings = None):
"""L+,W+: Получить значение из глобального словаря настроек Оркестратора GSettings по списку ключей.
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inKeyList: Список ключей, по адресу которого получить значение из GSettings
:return: Значение (тип данных определяется форматом хранения в GSettings)
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
return lDict.get(inKeyList[-1],None)
def GSettingsKeyListValueAppend(inValue, inKeyList=None, inGSettings = None):
"""L+,W+: Применить операцию .append к обьекту, расположенному по адресу списка ключей inKeyList в глобальном словаре настроек Оркестратора GSettings. Пример: Добавить значение в конец списка (list).
.. code-block:: python
# ПРИМЕР
from pyOpenRPA import Orchestrator
Orchestrator.GSettingsKeyListValueAppend(
inGSettings = gSettings,
inValue = "NewValue",
inKeyList=["NewKeyDict","NewKeyList"]):
# result inGSettings: {
# ... another keys in gSettings ...,
# "NewKeyDict":{
# "NewKeyList":[
# "NewValue"
# ]
# }
#}
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inValue: Значение для установки в глобальный словарь настроек Оркестратора GSettings
:param inKeyList: Список ключей, по адресу которого выполнить добавление в конец списка (list)
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
lDict[inKeyList[-1]].append(inValue) #Set value
return True
def GSettingsKeyListValueOperatorPlus(inValue, inKeyList=None, inGSettings = None):
"""L+,W+: Применить оператор сложения (+) к обьекту, расположенному по адресу списка ключей inKeyList в глобальном словаре настроек Оркестратора GSettings. Пример: соединить 2 списка (list).
.. code-block:: python
# ПРИМЕР
from pyOpenRPA import Orchestrator
Orchestrator.GSettingsKeyListValueOperatorPlus(
inGSettings = gSettings,
inValue = [1,2,3],
inKeyList=["NewKeyDict","NewKeyList"]):
# result inGSettings: {
# ... another keys in gSettings ...,
# "NewKeyDict":{
# "NewKeyList":[
# "NewValue",
# 1,
# 2,
# 3
# ]
# }
#}
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inValue: Значение для установки в глобальный словарь настроек Оркестратора GSettings
:param inKeyList: Список ключей, по адресу которого выполнить добавление в конец списка (list)
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
lDict[inKeyList[-1]] += inValue #Set value
return True
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# OrchestratorWeb defs # OrchestratorWeb defs
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@ -1170,13 +1315,11 @@ def WebRequestGet():
lCurrentThread = threading.current_thread() lCurrentThread = threading.current_thread()
if hasattr(lCurrentThread, "request"): if hasattr(lCurrentThread, "request"):
return lCurrentThread.request return lCurrentThread.request
from fastapi import FastAPI from fastapi import FastAPI
def WebAppGet() -> FastAPI: def WebAppGet() -> FastAPI:
"""L+,W+: Вернуть экземпляр веб сервера fastapi.FastAPI (app). Подробнее про app см. https://fastapi.tiangolo.com/tutorial/first-steps/ """L+,W+: Вернуть экземпляр веб сервера fastapi.FastAPI (app). Подробнее про app см. https://fastapi.tiangolo.com/tutorial/first-steps/
""" """
return Server.app return Server.app
def WebUserLoginGet(inAuthTokenStr: str=None) -> str: def WebUserLoginGet(inAuthTokenStr: str=None) -> str:
"""L+,W+: Получить логин авторизованного пользователя. Если авторизация не производилась - вернуть None """L+,W+: Получить логин авторизованного пользователя. Если авторизация не производилась - вернуть None
@ -1197,22 +1340,24 @@ def WebUserDomainGet(inAuthTokenStr: str=None) -> str:
:return: Домен пользователя :return: Домен пользователя
:rtype: str :rtype: str
""" """
if inAuthTokenStr is None: return None if inAuthTokenStr is None: return None
inGS = GSettingsGet() # Get the global settings inGS = GSettingsGet() # Get the global settings
return inGS.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("Domain", None) return inGS.get("ServerDict", {}).get("AccessUsers", {}).get("AuthTokensDict", {}).get(inAuthTokenStr, {}).get("Domain", None)
def WebUserInfoGet(inRequest=None): def WebUserInfoGet(inAuthTokenStr=None):
"""L+,W+: [ПРЕКРАЩЕНИЕ ПОДДЕРЖКИ В 1.3.2, см. WebUserLoginGet, WebUserDomainGet] Информация о пользователе, который отправил HTTP запрос. """L+,W+: Информация о пользователе, который отправил HTTP запрос.
:param inRequest: Экземпляр HTTP request. Опционален, если сообщение фиксируется из под потока, который был инициирован запросом пользователя :param inRequest: Экземпляр HTTP request. Опционален, если сообщение фиксируется из под потока, который был инициирован запросом пользователя
:return: Сведения в формате {"DomainUpperStr": "PYOPENRPA", "UserNameUpperStr": "IVAN.MASLOV"} :return: Сведения в формате {"DomainUpperStr": "PYOPENRPA", "UserNameUpperStr": "IVAN.MASLOV"}
""" """
if inRequest is None: inRequest = WebRequestGet()
try: try:
lDomainUpperStr = inRequest.OpenRPA["Domain"].upper() lResultDict = {
lUserUpperStr = inRequest.OpenRPA["User"].upper() "DomainUpperStr": WebUserDomainGet(inAuthTokenStr=inAuthTokenStr).upper(),
return {"DomainUpperStr": lDomainUpperStr, "UserNameUpperStr": lUserUpperStr} "UserNameUpperStr": WebUserLoginGet(inAuthTokenStr=inAuthTokenStr).upper()
}
return lResultDict
except Exception as e: except Exception as e:
return {"DomainUpperStr": None, "UserNameUpperStr": None} return {"DomainUpperStr": None, "UserNameUpperStr": None}
@ -1270,149 +1415,7 @@ def WebUserUACCheck(inAuthTokenStr:str=None, inKeyList:list=None) -> bool:
break # Close the loop break # Close the loop
return lResult # Return the result return lResult # Return the result
## GSettings defs
from . import SettingsTemplate
GSettings = SettingsTemplate.Create(inModeStr = "BASIC")
# Modules alias for pyOpenRPA.Orchestrator and pyOpenRPA.Orchestrator.__Orchestrator__
lCurrentModule = sys.modules[__name__]
if __name__ == "pyOpenRPA.Orchestrator" and "pyOpenRPA.Orchestrator.__Orchestrator__" not in sys.modules:
sys.modules["pyOpenRPA.Orchestrator.__Orchestrator__"] = lCurrentModule
if __name__ == "pyOpenRPA.Orchestrator.__Orchestrator__" and "pyOpenRPA.Orchestrator" not in sys.modules:
sys.modules["pyOpenRPA.Orchestrator"] = lCurrentModule
def GSettingsGet(inGSettings=None):
"""L+,W+: Вернуть глобальный словарь настроек Оркестратора. Во время выполнения программы глобальный словарь настроек существует в единственном экземпляре (синглтон)
:param inGSettings: Дополнительный словарь настроек, который необходимо добавить в основной глобальный словарь настроек Оркестратора (синглтон)
:return: Глобальный словарь настроек GSettings
"""
global GSettings # identify the global variable
# Merge dictionaries if some new dictionary has come
if inGSettings is not None and GSettings is not inGSettings:
GSettings = Dictionary.MergeNoException(in1Dict = inGSettings, in2Dict = GSettings)
return GSettings # Return the result
def GSettingsKeyListValueSet(inValue, inKeyList=None, inGSettings = None):
"""L+,W+: Установить значение из глобального словаря настроек Оркестратора GSettings по списку ключей.
Пример: Для того, чтобы установить значение для ключа car в словаре {"complex":{"car":"green"}, "simple":"HELLO"}, необходимо передать список ключей: ["complex", "car"]
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inValue: Значение для установки в глобальный словарь настроек Оркестратора GSettings
:param inKeyList: Список ключей, по адресу которого установить значение в GSettings
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
lDict[inKeyList[-1]] = inValue #Set value
return True
def GSettingsKeyListValueGet(inKeyList=None, inGSettings = None):
"""L+,W+: Получить значение из глобального словаря настроек Оркестратора GSettings по списку ключей.
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inKeyList: Список ключей, по адресу которого получить значение из GSettings
:return: Значение (тип данных определяется форматом хранения в GSettings)
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
return lDict.get(inKeyList[-1],None)
def GSettingsKeyListValueAppend(inValue, inKeyList=None, inGSettings = None):
"""L+,W+: Применить операцию .append к обьекту, расположенному по адресу списка ключей inKeyList в глобальном словаре настроек Оркестратора GSettings. Пример: Добавить значение в конец списка (list).
.. code-block:: python
# ПРИМЕР
from pyOpenRPA import Orchestrator
Orchestrator.GSettingsKeyListValueAppend(
inGSettings = gSettings,
inValue = "NewValue",
inKeyList=["NewKeyDict","NewKeyList"]):
# result inGSettings: {
# ... another keys in gSettings ...,
# "NewKeyDict":{
# "NewKeyList":[
# "NewValue"
# ]
# }
#}
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inValue: Значение для установки в глобальный словарь настроек Оркестратора GSettings
:param inKeyList: Список ключей, по адресу которого выполнить добавление в конец списка (list)
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
lDict[inKeyList[-1]].append(inValue) #Set value
return True
def GSettingsKeyListValueOperatorPlus(inValue, inKeyList=None, inGSettings = None):
"""L+,W+: Применить оператор сложения (+) к обьекту, расположенному по адресу списка ключей inKeyList в глобальном словаре настроек Оркестратора GSettings. Пример: соединить 2 списка (list).
.. code-block:: python
# ПРИМЕР
from pyOpenRPA import Orchestrator
Orchestrator.GSettingsKeyListValueOperatorPlus(
inGSettings = gSettings,
inValue = [1,2,3],
inKeyList=["NewKeyDict","NewKeyList"]):
# result inGSettings: {
# ... another keys in gSettings ...,
# "NewKeyDict":{
# "NewKeyList":[
# "NewValue",
# 1,
# 2,
# 3
# ]
# }
#}
:param inGSettings: Глобальный словарь настроек Оркестратора (синглтон)
:param inValue: Значение для установки в глобальный словарь настроек Оркестратора GSettings
:param inKeyList: Список ключей, по адресу которого выполнить добавление в конец списка (list)
"""
inGSettings = GSettingsGet(inGSettings=inGSettings) # Set the global settings
if inKeyList is None: inKeyList = []
lDict = inGSettings
for lItem2 in inKeyList[:-1]:
#Check if key - value exists
if lItem2 in lDict:
pass
else:
lDict[lItem2]={}
lDict=lDict[lItem2]
lDict[inKeyList[-1]] += inValue #Set value
return True
def StorageRobotExists(inRobotNameStr): def StorageRobotExists(inRobotNameStr):
"""L+,W+: Проверить, существует ли ключ inRobotNameStr в хранилище пользовательской информации StorageDict (GSettings > StarageDict) """L+,W+: Проверить, существует ли ключ inRobotNameStr в хранилище пользовательской информации StorageDict (GSettings > StarageDict)

Loading…
Cancel
Save