From b5f67a67ddb1c48644b85ec6ca3cc2d889b66797 Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Sat, 30 Nov 2019 15:28:34 +0300 Subject: [PATCH] - OrchestratorConnector: Was created from IntegrationOrchestrator - OrchestratorConnector: Created Async methods - Orchestrator minor bug fix in SettingsExample - OrchestratorConnector: Add AuthToken authorization (Supertoken) - Orchestrator: check login password correct if auth token is not correct - checked! - OrchestratorConnector: Fix in interaction between Orchestrator and .py module - Orchestrator.Processor: Minor fix in GlobalDictKeyListValueGet - Orchestrator.Processor: Bug fix in GlobalDictKeyListValueSet and GlobalDictKeyListValueGet - Orchestrator: Add supertoken in Settings - OrchestratorConnector: Final refactoring - OrchestratorConnector: If Orchestrator not responding correct - do nothing - OrchestratorConnector: DataSendAsync, DataSendSync, IntervalDataSendAsync, ConfigurationInit - Orchestrator.Processor: Return True if GlobalDictKeyListValueSet and minor fix - OrchestratorConnector: DataReceiveResetAsync, DataSendResetAsync, IntervalDataSendResetAsync, IntervalDataReceiveResetAsync - OrchestratorConnector: ConfigurationInit - init interval functions from dict configuration Signed-off-by: Ivan Maslov --- Orchestrator/Settings/AccessUser_ND.py | 2 - .../Settings/AccessUser_SuperToken.py | 35 ++ Orchestrator/Settings/Settings.py | 7 +- Robot/RobotSettings.py | 25 ++ Sources/pyOpenRPA/Orchestrator/Processor.py | 22 +- Sources/pyOpenRPA/Orchestrator/Server.py | 12 +- .../Robot/IntegrationOrchestrator.py | 46 --- .../pyOpenRPA/Robot/OrchestratorConnector.py | 385 ++++++++++++++++++ Sources/pyOpenRPA/Robot/Test.py | 65 +++ Sources/pyOpenRPA/Robot/Utils/TimerRepeat.py | 28 ++ Sources/pyOpenRPA/Robot/__init__.py | 2 +- 11 files changed, 567 insertions(+), 62 deletions(-) create mode 100644 Orchestrator/Settings/AccessUser_SuperToken.py delete mode 100644 Sources/pyOpenRPA/Robot/IntegrationOrchestrator.py create mode 100644 Sources/pyOpenRPA/Robot/OrchestratorConnector.py create mode 100644 Sources/pyOpenRPA/Robot/Test.py create mode 100644 Sources/pyOpenRPA/Robot/Utils/TimerRepeat.py diff --git a/Orchestrator/Settings/AccessUser_ND.py b/Orchestrator/Settings/AccessUser_ND.py index f489560f..1988e628 100644 --- a/Orchestrator/Settings/AccessUser_ND.py +++ b/Orchestrator/Settings/AccessUser_ND.py @@ -1,6 +1,4 @@ def SettingsUpdate(inDict): - #Append to global list - inDict["Server"]["AccessUsers"]["RuleMethodMatchURLBeforeList"]=inDict["Server"]["AccessUsers"]["RuleMethodMatchURLBeforeList"]+lRuleMethodMatchURLBeforeList ################################################## #""/"ND" MethodMatchURLList l__ND_RuleMethodMatchURLBeforeList={ diff --git a/Orchestrator/Settings/AccessUser_SuperToken.py b/Orchestrator/Settings/AccessUser_SuperToken.py new file mode 100644 index 00000000..f12b2a22 --- /dev/null +++ b/Orchestrator/Settings/AccessUser_SuperToken.py @@ -0,0 +1,35 @@ +import datetime +def SettingsUpdate(inDict): + ################################################## + #""/"SuperToken" MethodMatchURLList + l__SuperToken_RuleMethodMatchURLBeforeList={ + ("","SUPERTOKEN"): { #!!!!!only in upper case!!!! + "MethodMatchURLBeforeList": [ + { + "Method":"GET", + "MatchType":"Beginwith", + "URL":"/", + #"FlagAccessDefRequestGlobalAuthenticate": TestDef + "FlagAccess": True + }, + { + "Method":"POST", + "MatchType":"Beginwith", + "URL":"/", + #"FlagAccessDefRequestGlobalAuthenticate": TestDef + "FlagAccess": True + } + ] + } + } + #Append to global list + inDict["Server"]["AccessUsers"]["RuleDomainUserDict"].update(l__SuperToken_RuleMethodMatchURLBeforeList) + #"":{"User":"", "Domain":"", "TokenDatetime":} + #!!!!!!!!!!!!!!!!!!!!!!! + #Attention: default supertoken is 1992-04-03-0643-ru-b4ff-openrpa52zzz - please change it when you will customize OpenRPA in your company + #!!!!!!!!!!!!!!!!!!!!!!! + inDict["Server"]["AccessUsers"]["AuthTokensDict"].update( + {"1992-04-03-0643-ru-b4ff-openrpa52zzz":{"User":"SuperToken", "Domain":"", "TokenDatetime": datetime.datetime.now(), "FlagDoNotExpire":True}} + ) + #Return current dict + return inDict \ No newline at end of file diff --git a/Orchestrator/Settings/Settings.py b/Orchestrator/Settings/Settings.py index 9a7e7ab8..febbaa2e 100644 --- a/Orchestrator/Settings/Settings.py +++ b/Orchestrator/Settings/Settings.py @@ -85,7 +85,7 @@ def Settings(): "AccessUsers": { #Default - all URL is blocked "FlagCredentialsAsk": True, #Turn on Authentication "RuleDomainUserDict": { - #("DOMAIN", "USER"): { #upper case + #("DOMAIN", "USER"): { !!!!!only in upper case!!!! # "MethodMatchURLBeforeList": [ # { # "Method":"GET|POST", @@ -107,7 +107,7 @@ def Settings(): # } ], "AuthTokensDict": { - #"":{"User":"", "Domain":"", "TokenDatetime":} + #"":{"User":"", "Domain":"", "TokenDatetime":, "FlagDoNotExpire":True} } }, "URLList":[ #List of available URLs with the orchestrator server @@ -189,7 +189,8 @@ def Settings(): "Logger": logging.getLogger("Orchestrator"), "Storage": { "Robot_R01_help": "Robot data storage in orchestrator env", - "Robot_R01": {} + "Robot_R01": {}, + "R01_OrchestratorToRobot":{"Test2":"Test2"} } } #Создать файл логирования diff --git a/Robot/RobotSettings.py b/Robot/RobotSettings.py index b92b8d66..3267daee 100644 --- a/Robot/RobotSettings.py +++ b/Robot/RobotSettings.py @@ -1,5 +1,6 @@ import logging import datetime +from pyOpenRPA.Robot import OrchestratorConnector #Robot settings def Settings(): import os @@ -14,8 +15,32 @@ def Settings(): "Python64FullPath": None, #Set from user "Python32ProcessName": "OpenRPAUIDesktopX32.exe", #Config set once "Python64ProcessName": "OpenRPAUIDesktopX64.exe" #Config set once + }, + "OrchestratorConnector": { + #Fill below } } + ###################### + #OrchestratorConnector + ###################### + mDict["OrchestratorConnector"]={ + "IntervalDataSendResetAsync": [ + { + "Interval": 2, + "RobotStorage": mGlobal["Storage"], + "RobotStorageKey": "R01_OrchestratorToRobot", + "RobotResetValue": {"Test": "Test"}, + "OrchestratorKeyList": ["Storage", "R01_OrchestratorToRobot"], + "OrchestratorProtocol": "http", + "OrchestratorHost": "localhost", + "OrchestratorPort": 8081, + "OrchestratorAuthToken": "1992-04-03-0643-ru-b4ff-openrpa52zzz" + } + ] + } + #Run OrchestratorConnector initialization + OrchestratorConnector.ConfigurationInit(mDict["OrchestratorConnector"]) + ######################### #Создать файл логирования # add filemode="w" to overwrite if not os.path.exists("Reports"): diff --git a/Sources/pyOpenRPA/Orchestrator/Processor.py b/Sources/pyOpenRPA/Orchestrator/Processor.py index bb525322..6089a62c 100644 --- a/Sources/pyOpenRPA/Orchestrator/Processor.py +++ b/Sources/pyOpenRPA/Orchestrator/Processor.py @@ -97,35 +97,38 @@ def Activity(inActivity): #Обработка команды OrchestratorRestart ########################################################### if lItem["Type"]=="OrchestratorRestart": - os.execl(sys.executable,os.path.abspath(__file__),*sys.argv) + os.execl(sys.executable, os.path.abspath(__file__), *sys.argv) lItem["Result"] = True sys.exit(0) ########################################################### #Обработка команды GlobalDictKeyListValueSet ########################################################### if lItem["Type"]=="GlobalDictKeyListValueSet": + lDict = mGlobalDict for lItem2 in lItem["KeyList"][:-1]: #Check if key - value exists - if lItem2 in mGlobalDict: + if lItem2 in lDict: pass else: - mGlobalDict[lItem2]={} - mGlobalDict=mGlobalDict[lItem2] + lDict[lItem2]={} + lDict=lDict[lItem2] #Set value - mGlobalDict[lItem["KeyList"][-1]]=lItem["value"] + lDict[lItem["KeyList"][-1]]=lItem["Value"] + lItem["Result"] = True ########################################################### #Обработка команды GlobalDictKeyListValueGet ########################################################### if lItem["Type"]=="GlobalDictKeyListValueGet": + lDict = mGlobalDict for lItem2 in lItem["KeyList"][:-1]: #Check if key - value exists - if lItem2 in mGlobalDict: + if lItem2 in lDict: pass else: - mGlobalDict[lItem2]={} - mGlobalDict=mGlobalDict[lItem2] + lDict[lItem2]={} + lDict=lDict[lItem2] #Return value - lItem["Result"]==mGlobalDict.get(lItem["KeyList"][-1],None) + lItem["Result"]=lDict.get(lItem["KeyList"][-1],None) #Определить вид активности lActivityDateTime=inActivity["DateTimeUTCStringStart"] ##################################### @@ -193,6 +196,7 @@ def Activity(inActivity): ################## #Trace activity ################## + #print(mGlobalDict) if mGlobalDict["Processor"].get(f"LogType_{lItem['Type']}",True): #Add activity in TransactionList if it is applicable mGlobalDict["Processor"]["LogList"].append(copy.deepcopy(lItem)) diff --git a/Sources/pyOpenRPA/Orchestrator/Server.py b/Sources/pyOpenRPA/Orchestrator/Server.py index 69365083..ab407598 100644 --- a/Sources/pyOpenRPA/Orchestrator/Server.py +++ b/Sources/pyOpenRPA/Orchestrator/Server.py @@ -88,6 +88,7 @@ def AuthenticateVerify(inRequest): mGlobalDict["Server"]["AccessUsers"]["AuthTokensDict"][lAuthToken] = {} mGlobalDict["Server"]["AccessUsers"]["AuthTokensDict"][lAuthToken]["Domain"] = lResult["Domain"] mGlobalDict["Server"]["AccessUsers"]["AuthTokensDict"][lAuthToken]["User"] = lResult["User"] + mGlobalDict["Server"]["AccessUsers"]["AuthTokensDict"][lAuthToken]["FlagDoNotExpire"] = False mGlobalDict["Server"]["AccessUsers"]["AuthTokensDict"][lAuthToken]["TokenDatetime"] = datetime.datetime.now() #Set-cookie inRequest.OpenRPA["AuthToken"] = lAuthToken @@ -112,6 +113,7 @@ def AuthenticateBlock(inRequest): #return bool True - go execute, False - dont execute def UserAccessCheckBefore(inMethod, inRequest): # Help def - Get access flag from dict + #pdb.set_trace() def HelpGetFlag(inAccessRuleItem, inRequest, inGlobalDict, inAuthenticateDict): if "FlagAccess" in inAccessRuleItem: return inAccessRuleItem["FlagAccess"] @@ -353,6 +355,7 @@ class testHTTPServer_RequestHandler(BaseHTTPRequestHandler): # POST def do_POST(self): # Prepare result dict + #pdb.set_trace() lResponseDict = {"Headers": {}, "SetCookies":{}, "Body": "", "StatusCode": None} self.OpenRPAResponseDict = lResponseDict ##################################### @@ -374,7 +377,7 @@ class testHTTPServer_RequestHandler(BaseHTTPRequestHandler): lFlagUserAccess = True #If need user authentication if mGlobalDict.get("Server", {}).get("AccessUsers", {}).get("FlagCredentialsAsk", False): - lFlagUserAccess = UserAccessCheckBefore("GET", self) + lFlagUserAccess = UserAccessCheckBefore("POST", self) ###################################### if lFlagUserAccess: #Централизованная функция получения запросов/отправки @@ -395,4 +398,11 @@ class testHTTPServer_RequestHandler(BaseHTTPRequestHandler): message = json.dumps(Processor.ActivityListOrDict(lInputObject)) # Write content as utf-8 data self.wfile.write(bytes(message, "utf8")) + return + else: + #Set access denied code + # Send response status code + self.send_response(403) + # Send headers + self.end_headers() return \ No newline at end of file diff --git a/Sources/pyOpenRPA/Robot/IntegrationOrchestrator.py b/Sources/pyOpenRPA/Robot/IntegrationOrchestrator.py deleted file mode 100644 index be03032c..00000000 --- a/Sources/pyOpenRPA/Robot/IntegrationOrchestrator.py +++ /dev/null @@ -1,46 +0,0 @@ -import requests -import grequests -#from requests import async -import json -################################### -##Orchestrator integration module (safe use when orchestrator is turned off) -################################### - -################################################################################ -#Send data to orchestrator (asynchronyous) -#Example: t=IntegrationOrchestrator.DataSend(["Storage","Robot_R01"],{"RunDateTimeString":"Test1","StepCurrentName":"Test2","StepCurrentDuration":"Test333","SafeStopSignal":True},"localhost",8081) -def DataSend(inKeyList,inValue,inOrchestratorHost="localhost",inOrchestratorPort=80): - lURL = f'http://{inOrchestratorHost}:{inOrchestratorPort}/ProcessingRun' - lDataJSON = {"actionList":[{"type":"AdministrationGlobalDictSetKeyListValue","key_list":inKeyList,"value":inValue}]} - #lAsyncList = [] - lResultItem = [grequests.post(lURL, json=lDataJSON)] - return grequests.map(lResultItem) - #lAsyncList.append(lResultItem) - #return async.map(lAsyncList) -################################################################################ -#recieve Data from orchestrator -#t=IntegrationOrchestrator.DataRecieve(["Storage","Robot_R01"],"localhost",8081) -def DataRecieve(inKeyList,inOrchestratorHost="localhost",inOrchestratorPort=80): - lURL = f'http://{inOrchestratorHost}:{inOrchestratorPort}/ProcessingRun' - lDataJSON = {"actionList":[{"type":"AdministrationGlobalDictGetKeyListValue","key_list":inKeyList}]} - try: - lResult = requests.post(lURL, json=lDataJSON) - lResultJSON = json.loads(lResult.text) - return lResultJSON["actionListResult"][0]["value"] - except Exception: - return None -################################################################################ -#Check if orchestrator has safe stop signal -#Example: IntegrationOrchestrator.SafeStopSignalIs(["Storage","Robot_R01","SafeStopSignal"],"localhost",8081) -def SafeStopSignalIs(inKeyList,inOrchestratorHost="localhost",inOrchestratorPort=80): - lResult=False - lResponse=DataRecieve(inKeyList,inOrchestratorHost,inOrchestratorPort) - if lResponse is not None: - lResult = lResponse - return lResult -################################################################################ -#Reset SafeStop signal in orchestrator -#Example: t=IntegrationOrchestrator.SafeStopSignalReset(["Storage","Robot_R01","SafeStopSignal"],"localhost",8081) -def SafeStopSignalReset(inKeyList,inOrchestratorHost="localhost",inOrchestratorPort=80): - lResponse=DataSend(inKeyList,False,inOrchestratorHost,inOrchestratorPort) - return lResponse \ No newline at end of file diff --git a/Sources/pyOpenRPA/Robot/OrchestratorConnector.py b/Sources/pyOpenRPA/Robot/OrchestratorConnector.py new file mode 100644 index 00000000..04970093 --- /dev/null +++ b/Sources/pyOpenRPA/Robot/OrchestratorConnector.py @@ -0,0 +1,385 @@ +import requests +#Logging +import os +import logging +import datetime +import copy +from .Utils import TimerRepeat # Timer which can repeating +mLogger=logging.getLogger("OrchestratorConnector") +######################### +# Создать файл логирования +# add filemode="w" to overwrite +if not os.path.exists("Reports"): + os.makedirs("Reports") +########################## +# Подготовка логгера Robot +######################### +mLogger.setLevel(logging.INFO) +# create the logging file handler +mLoggerFH = logging.FileHandler("Reports\ReportOrchestratorConnector_" + datetime.datetime.now().strftime("%Y_%m_%d") + ".log") +mLoggerFormatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +mLoggerFH.setFormatter(mLoggerFormatter) +# add handler to logger object +mLogger.addHandler(mLoggerFH) +############################################ +#from requests import async +import json +################################### +##Orchestrator integration module (safe use when orchestrator is turned off) +################################### +################################################################################ +# Recieve data from orchestrator (synchronyous) +# Example: +# t=IntegrationOrchestrator.DataRecieveAsync( +# RobotStorage=mGlobal["Storage"], +# RobotStorageKey="R01_OrchestratorToRobot", +# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], +# OrchestratorProtocol="http", +# OrchestratorHost="localhost", +# OrchestratorPort=8081, +# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" +# ) +def DataReceiveSync( + OrchestratorKeyList, OrchestratorProtocol="http", + OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None + ): + lCookies = {} + # Set auth token if authorization is needed + if OrchestratorAuthToken: + lCookies["AuthToken"] = OrchestratorAuthToken + lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor' + lDataJSON = [{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList}] + try: + lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies) + lResultJSON = json.loads(lResult.text) + return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data) + except Exception: + mLogger.warning( + f"Orchestrator not responding. Def DataRecieveSync, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}") + return (False, None) # (Flag response is not ok, Data None) +################################################################################ +# Recieve data from orchestrator (asynchronyous) +# Example: +# t=IntegrationOrchestrator.DataRecieveAsync( +# RobotStorage=mGlobal["Storage"], +# RobotStorageKey="R01_OrchestratorToRobot", +# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], +# OrchestratorProtocol="http", +# OrchestratorHost="localhost", +# OrchestratorPort=8081, +# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" +# ) +def DataReceiveAsync( + RobotStorage, RobotStorageKey, OrchestratorKeyList, OrchestratorProtocol="http", + OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None + ): + from threading import Thread + import uuid + global mGlobalDict + class ThreadAsync(Thread): + def DataRecieveSync(self): + lCookies = {} + #Set auth token if authorization is needed + if OrchestratorAuthToken: + lCookies["AuthToken"] = OrchestratorAuthToken + lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor' + lDataJSON = [{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList}] + try: + lResult = requests.post(lURL, json=lDataJSON, cookies = lCookies) + lResultJSON = json.loads(lResult.text) + return (True,lResultJSON[0]["Result"]) #(Flag response is ok, Data) + except Exception: + mLogger.warning( + f"Orchestrator not responding. Def DataRecieveAsync, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}") + return (False,None) #(Flag response is not ok, Data None) + # Thread init + def __init__(self, name): + Thread.__init__(self) + self.name = name + #Thread start + def run(self): + (lFlagResponseOK,lResponseData) = self.DataRecieveSync() + if lFlagResponseOK: + RobotStorage[RobotStorageKey] = lResponseData + ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}") + ThreadObject.start() + return True +################################################################################ +#IntervalDataRecieveAsync - Periodic recieve data from orchestrator and update storage +def IntervalDataReceiveAsync(*args, **kwargs): + lInterval=3 + #Delete index 0 from args + lArgs=copy.copy(args) + if len(lArgs)>0: + lInterval = lArgs[0] + lArgs = lArgs[1:] + #Delete Interval from kwargs + lKwargs = copy.copy(kwargs) + if "Interval" in lKwargs: + lInterval = lKwargs["Interval"] + del lKwargs["Interval"] + lTimer = TimerRepeat.TimerRepeat(lInterval, DataReceiveAsync, lArgs, lKwargs) + lTimer.start() + return lTimer +################################################################################ +################################### +################################ +################################### +################################################################################ +# Send data from orchestrator (synchronyous) +# Example: +# t=IntegrationOrchestrator.DataSendSync( +# RobotValue="Value", +# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], +# OrchestratorProtocol="http", +# OrchestratorHost="localhost", +# OrchestratorPort=8081, +# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" +# ) +def DataSendSync( + RobotValue, OrchestratorKeyList, OrchestratorProtocol="http", + OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None + ): + lCookies = {} + # Set auth token if authorization is needed + if OrchestratorAuthToken: + lCookies["AuthToken"] = OrchestratorAuthToken + lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor' + lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}] + try: + lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies) + lResultJSON = json.loads(lResult.text) + return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data) + except Exception: + mLogger.warning( + f"Orchestrator not responding. Def: DataSendSync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}") + return (False, None) # (Flag response is not ok, Data None) +################################################################################ +# Send data from orchestrator (asynchronyous) +# Example: +# t=IntegrationOrchestrator.DataSendAsync( +# RobotStorage=mGlobal["Storage"], +# RobotStorageKey="R01_OrchestratorToRobot", +# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], +# OrchestratorProtocol="http", +# OrchestratorHost="localhost", +# OrchestratorPort=8081, +# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" +# ) +def DataSendAsync( + RobotStorage, RobotStorageKey, OrchestratorKeyList, OrchestratorProtocol="http", + OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None + ): + from threading import Thread + import uuid + global mGlobalDict + class ThreadAsync(Thread): + def DataSendSync(self): + RobotValue = RobotStorage[RobotStorageKey] + lCookies = {} + # Set auth token if authorization is needed + if OrchestratorAuthToken: + lCookies["AuthToken"] = OrchestratorAuthToken + lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor' + lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}] + try: + lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies) + lResultJSON = json.loads(lResult.text) + return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data) + except Exception: + mLogger.warning( + f"Orchestrator not responding. Def: DataSendAsync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}") + return (False, None) # (Flag response is not ok, Data None) + # Thread init + def __init__(self, name): + Thread.__init__(self) + self.name = name + #Thread start + def run(self): + self.DataSendSync() + ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}") + ThreadObject.start() + return True +################################################################################ +#IntervalDataSendAsync - Periodic send data from robot to orchestrator +def IntervalDataSendAsync(*args,**kwargs): + lInterval=3 + #Delete index 0 from args + lArgs=copy.copy(args) + if len(lArgs)>0: + lInterval = lArgs[0] + lArgs = lArgs[1:] + #Delete Interval from kwargs + lKwargs = copy.copy(kwargs) + if "Interval" in lKwargs: + lInterval = lKwargs["Interval"] + del lKwargs["Interval"] + lTimer = TimerRepeat.TimerRepeat(lInterval, DataSendAsync, lArgs, lKwargs) + lTimer.start() + return lTimer +################################################################################ +################################### +################################ +################################### +################################################################################ +# Check if RobotStorage[Key] Value has been changed > then send data + reset to orchestrator (asynchronyous) timeout 2 seconds +# Example: +# t=IntegrationOrchestrator.DataSendResetAsync( +# RobotStorage=mGlobal["Storage"], +# RobotStorageKey="R01_OrchestratorToRobot", +# RobotResetValue="Test", +# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], +# OrchestratorProtocol="http", +# OrchestratorHost="localhost", +# OrchestratorPort=8081, +# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" +# ) +def DataSendResetAsync( + RobotStorage, RobotStorageKey, RobotResetValue, OrchestratorKeyList, OrchestratorProtocol="http", + OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None + ): + #Do operations if data not equal to ResetValue + if RobotStorage[RobotStorageKey] != RobotResetValue: + #Get value + lRobotValue = copy.deepcopy(RobotStorage[RobotStorageKey]) + #Reset value + RobotStorage[RobotStorageKey] = copy.deepcopy(RobotResetValue) + #Send data (retry while data will be transferred completele) + from threading import Thread + import uuid + import time + global mGlobalDict + class ThreadAsync(Thread): + def DataSendSync(self): + RobotValue = lRobotValue + lCookies = {} + # Set auth token if authorization is needed + if OrchestratorAuthToken: + lCookies["AuthToken"] = OrchestratorAuthToken + lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor' + lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}] + lFlagDataTransmit = False + while not lFlagDataTransmit: + try: + lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies) + lResultJSON = json.loads(lResult.text) + lFlagDataTransmit = True + except Exception: + mLogger.warning( + f"Orchestrator not responding - will retry to send update. Timeout 2 seconds. Def: DataSendResetAsync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}") + time.sleep(2) #Timout for next loop + return (True,True) # Only True can be returned + # Thread init + def __init__(self, name): + Thread.__init__(self) + self.name = name + # Thread start + def run(self): + self.DataSendSync() + ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}") + ThreadObject.start() + return True + return True +################################################################################ +################################################################################ +#IntervalDataSendResetAsync - Periodic check changed and send + reset data from robot to orchestrator +def IntervalDataSendResetAsync(*args,**kwargs): + lInterval=3 + #Delete index 0 from args + lArgs=copy.copy(args) + if len(lArgs)>0: + lInterval = lArgs[0] + lArgs = lArgs[1:] + #Delete Interval from kwargs + lKwargs = copy.copy(kwargs) + if "Interval" in lKwargs: + lInterval = lKwargs["Interval"] + del lKwargs["Interval"] + lTimer = TimerRepeat.TimerRepeat(lInterval, DataSendResetAsync, lArgs, lKwargs) + lTimer.start() + return lTimer +################################################################################ +# Check changes in orchestrator - then replace in RobotStorage if not equeal. Has no timeout because You can use function IntervalDataReceiveResetAsync (asynchronyous) +#Next iteration do not rewrite value until new change has come from orchestrator +# Example: +# t=IntegrationOrchestrator.DataRecieveAsync( +# RobotStorage=mGlobal["Storage"], +# RobotStorageKey="R01_OrchestratorToRobot", +# RobotResetValue={"Test":"Test"}, +# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], +# OrchestratorProtocol="http", +# OrchestratorHost="localhost", +# OrchestratorPort=8081, +# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" +# ) +def DataReceiveResetAsync( + RobotStorage, RobotStorageKey, RobotResetValue, OrchestratorKeyList, OrchestratorProtocol="http", + OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None + ): + from threading import Thread + import uuid + global mGlobalDict + class ThreadAsync(Thread): + def DataRecieveSync(self): + lCookies = {} + #Set auth token if authorization is needed + if OrchestratorAuthToken: + lCookies["AuthToken"] = OrchestratorAuthToken + lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor' + lDataJSON = [ + {"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList}, + {"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotResetValue} + ] + try: + lResult = requests.post(lURL, json=lDataJSON, cookies = lCookies) + lResultJSON = json.loads(lResult.text) + #Change data if it changes with ResetValue + if lResultJSON[0]["Result"] != RobotResetValue: + return (True,lResultJSON[0]["Result"]) #(Flag data changes is ok, Data) + else: + return (False, lResultJSON[0]["Result"]) # (Flag data changes is false - dont rewrite in RobotStorage, Data) + except Exception: + mLogger.warning( + f"Orchestrator not responding. Def DataReceiveResetAsync, RobotResetValue: {str(RobotResetValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}") + return (False,None) #(Flag response is not ok, Data None) + # Thread init + def __init__(self, name): + Thread.__init__(self) + self.name = name + #Thread start + def run(self): + (lFlagResponseOK,lResponseData) = self.DataRecieveSync() + if lFlagResponseOK: + RobotStorage[RobotStorageKey] = lResponseData + ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}") + ThreadObject.start() + return True +################################################################################ +################################################################################ +#IntervalDataReceiveResetAsync - Periodic receive + every time reset and check changed and reset data on robot storage +def IntervalDataReceiveResetAsync(*args,**kwargs): + lInterval=3 + #Delete index 0 from args + lArgs=copy.copy(args) + if len(lArgs)>0: + lInterval = lArgs[0] + lArgs = lArgs[1:] + #Delete Interval from kwargs + lKwargs = copy.copy(kwargs) + if "Interval" in lKwargs: + lInterval = lKwargs["Interval"] + del lKwargs["Interval"] + lTimer = TimerRepeat.TimerRepeat(lInterval, DataReceiveResetAsync, lArgs, lKwargs) + lTimer.start() + return lTimer +################################################################################# +################################################################################# +################################################################################ +#ConfigurationInit - Get dict configuration and init interval functions +def ConfigurationInit(inConfigurationDict): + for lItem in inConfigurationDict.keys(): + lFunction = globals()[lItem] + #Iterate throught the nested list + for lFunctionConfigurationDict in inConfigurationDict[lItem]: + lFunction(**lFunctionConfigurationDict) + return True \ No newline at end of file diff --git a/Sources/pyOpenRPA/Robot/Test.py b/Sources/pyOpenRPA/Robot/Test.py new file mode 100644 index 00000000..9f8463d7 --- /dev/null +++ b/Sources/pyOpenRPA/Robot/Test.py @@ -0,0 +1,65 @@ +import unittest +from threading import Timer +import sys +lFolderPath = "/".join(__file__.split("\\")[:-3]) +sys.path.insert(0, lFolderPath) +from pyOpenRPA.Robot import OrchestratorConnector +from pyOpenRPA.Robot import Utils +class MyTestCase(unittest.TestCase): + def test_something(self): + #self.assertEqual(True, False) + mGlobal={"Storage":{"R01_OrchestratorToRobot":{"Test":"Test2"}}} + # t=OrchestratorConnector.IntervalDataSendAsync( + # Interval=1, + # RobotStorage=mGlobal["Storage"], + # RobotStorageKey="R01_OrchestratorToRobot", + # OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], + # OrchestratorProtocol="http", + # OrchestratorHost="localhost", + # OrchestratorPort=8081, + # OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" + # ) + # t=OrchestratorConnector.DataSendSync( + # RobotValue="Test", + # OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], + # OrchestratorProtocol="http", + # OrchestratorHost="localhost", + # OrchestratorPort=8081, + # OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" + # ) + import time + #def Func(lT,inl): + # print(lT) + # return True + #lTimer= Utils.TimerRepeat.TimerRepeat(1, Func, ["dddd"],{"inl":9}) + #lTimer.start() + OrchestratorConnector.ConfigurationInit({ + "IntervalDataSendResetAsync": [ + { + "Interval": 2, + "RobotStorage": mGlobal["Storage"], + "RobotStorageKey": "R01_OrchestratorToRobot", + "RobotResetValue": {"Test": "Test"}, + "OrchestratorKeyList": ["Storage", "R01_OrchestratorToRobot"], + "OrchestratorProtocol": "http", + "OrchestratorHost": "localhost", + "OrchestratorPort": 8081, + "OrchestratorAuthToken": "1992-04-03-0643-ru-b4ff-openrpa52zzz" + } + ] + }) + while True: + print(mGlobal["Storage"]["R01_OrchestratorToRobot"]) + # t = OrchestratorConnector.DataSendResetAsync( + # RobotStorage=mGlobal["Storage"], + # RobotStorageKey="R01_OrchestratorToRobot", + # RobotResetValue={"Test": "Test"}, + # OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"], + # OrchestratorProtocol="http", + # OrchestratorHost="localhost", + # OrchestratorPort=8081, + # OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz" + # ) + time.sleep(0.5) +if __name__ == '__main__': + unittest.main() diff --git a/Sources/pyOpenRPA/Robot/Utils/TimerRepeat.py b/Sources/pyOpenRPA/Robot/Utils/TimerRepeat.py new file mode 100644 index 00000000..afe268f2 --- /dev/null +++ b/Sources/pyOpenRPA/Robot/Utils/TimerRepeat.py @@ -0,0 +1,28 @@ +from threading import Timer +import datetime +# lTimer = RepeatedTimer(3, def, [], {}) # it auto-starts, no need of rt.start() +# if def return None = timer stops +# lTimer.start() +class TimerRepeat(object): + def __init__(self, interval, function, args, kwargs): + self._timer = None + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.is_running = False + self.start() + def _run(self): + self.is_running = False + lResult = self.function(*self.args, **self.kwargs) + if lResult is not None: + if lResult: + self.start() + def start(self): + if not self.is_running: + self._timer = Timer(self.interval, self._run) + self._timer.start() + self.is_running = True + def stop(self): + self._timer.cancel() + self.is_running = False \ No newline at end of file diff --git a/Sources/pyOpenRPA/Robot/__init__.py b/Sources/pyOpenRPA/Robot/__init__.py index 5e1beb6a..0ddf28be 100644 --- a/Sources/pyOpenRPA/Robot/__init__.py +++ b/Sources/pyOpenRPA/Robot/__init__.py @@ -4,7 +4,7 @@ The OpenRPA package (from UnicodeLabs) """ __all__ = [ - 'Clipboard', 'IntegrationOrchestrator', 'Window' + 'Clipboard', 'OrchestratorConnector.py', 'Window' ] __author__ = 'Ivan Maslov ' #from . import UIDesktop