- OrchestratorConnector: Created Async methods - Orchestrator minor bug fix in SettingsExample - OrchestratorConnector: Add AuthToken authorization (Supertoken) - Orchestrator: check login password correct if auth token is not correct - checked! - OrchestratorConnector: Fix in interaction between Orchestrator and .py module - Orchestrator.Processor: Minor fix in GlobalDictKeyListValueGet - Orchestrator.Processor: Bug fix in GlobalDictKeyListValueSet and GlobalDictKeyListValueGet - Orchestrator: Add supertoken in Settings - OrchestratorConnector: Final refactoring - OrchestratorConnector: If Orchestrator not responding correct - do nothing - OrchestratorConnector: DataSendAsync, DataSendSync, IntervalDataSendAsync, ConfigurationInit - Orchestrator.Processor: Return True if GlobalDictKeyListValueSet and minor fix - OrchestratorConnector: DataReceiveResetAsync, DataSendResetAsync, IntervalDataSendResetAsync, IntervalDataReceiveResetAsync - OrchestratorConnector: ConfigurationInit - init interval functions from dict configuration Signed-off-by: Ivan Maslov <Ivan.Maslov@UnicodeLabs.ru>dev-linux
parent
f5d15bec0b
commit
b5f67a67dd
@ -0,0 +1,35 @@
|
||||
import datetime
|
||||
def SettingsUpdate(inDict):
|
||||
##################################################
|
||||
#""/"SuperToken" MethodMatchURLList
|
||||
l__SuperToken_RuleMethodMatchURLBeforeList={
|
||||
("","SUPERTOKEN"): { #!!!!!only in upper case!!!!
|
||||
"MethodMatchURLBeforeList": [
|
||||
{
|
||||
"Method":"GET",
|
||||
"MatchType":"Beginwith",
|
||||
"URL":"/",
|
||||
#"FlagAccessDefRequestGlobalAuthenticate": TestDef
|
||||
"FlagAccess": True
|
||||
},
|
||||
{
|
||||
"Method":"POST",
|
||||
"MatchType":"Beginwith",
|
||||
"URL":"/",
|
||||
#"FlagAccessDefRequestGlobalAuthenticate": TestDef
|
||||
"FlagAccess": True
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
#Append to global list
|
||||
inDict["Server"]["AccessUsers"]["RuleDomainUserDict"].update(l__SuperToken_RuleMethodMatchURLBeforeList)
|
||||
#"<AuthToken>":{"User":"", "Domain":"", "TokenDatetime":<Datetime>}
|
||||
#!!!!!!!!!!!!!!!!!!!!!!!
|
||||
#Attention: default supertoken is 1992-04-03-0643-ru-b4ff-openrpa52zzz - please change it when you will customize OpenRPA in your company
|
||||
#!!!!!!!!!!!!!!!!!!!!!!!
|
||||
inDict["Server"]["AccessUsers"]["AuthTokensDict"].update(
|
||||
{"1992-04-03-0643-ru-b4ff-openrpa52zzz":{"User":"SuperToken", "Domain":"", "TokenDatetime": datetime.datetime.now(), "FlagDoNotExpire":True}}
|
||||
)
|
||||
#Return current dict
|
||||
return inDict
|
@ -1,46 +0,0 @@
|
||||
import requests
|
||||
import grequests
|
||||
#from requests import async
|
||||
import json
|
||||
###################################
|
||||
##Orchestrator integration module (safe use when orchestrator is turned off)
|
||||
###################################
|
||||
|
||||
################################################################################
|
||||
#Send data to orchestrator (asynchronyous)
|
||||
#Example: t=IntegrationOrchestrator.DataSend(["Storage","Robot_R01"],{"RunDateTimeString":"Test1","StepCurrentName":"Test2","StepCurrentDuration":"Test333","SafeStopSignal":True},"localhost",8081)
|
||||
def DataSend(inKeyList,inValue,inOrchestratorHost="localhost",inOrchestratorPort=80):
|
||||
lURL = f'http://{inOrchestratorHost}:{inOrchestratorPort}/ProcessingRun'
|
||||
lDataJSON = {"actionList":[{"type":"AdministrationGlobalDictSetKeyListValue","key_list":inKeyList,"value":inValue}]}
|
||||
#lAsyncList = []
|
||||
lResultItem = [grequests.post(lURL, json=lDataJSON)]
|
||||
return grequests.map(lResultItem)
|
||||
#lAsyncList.append(lResultItem)
|
||||
#return async.map(lAsyncList)
|
||||
################################################################################
|
||||
#recieve Data from orchestrator
|
||||
#t=IntegrationOrchestrator.DataRecieve(["Storage","Robot_R01"],"localhost",8081)
|
||||
def DataRecieve(inKeyList,inOrchestratorHost="localhost",inOrchestratorPort=80):
|
||||
lURL = f'http://{inOrchestratorHost}:{inOrchestratorPort}/ProcessingRun'
|
||||
lDataJSON = {"actionList":[{"type":"AdministrationGlobalDictGetKeyListValue","key_list":inKeyList}]}
|
||||
try:
|
||||
lResult = requests.post(lURL, json=lDataJSON)
|
||||
lResultJSON = json.loads(lResult.text)
|
||||
return lResultJSON["actionListResult"][0]["value"]
|
||||
except Exception:
|
||||
return None
|
||||
################################################################################
|
||||
#Check if orchestrator has safe stop signal
|
||||
#Example: IntegrationOrchestrator.SafeStopSignalIs(["Storage","Robot_R01","SafeStopSignal"],"localhost",8081)
|
||||
def SafeStopSignalIs(inKeyList,inOrchestratorHost="localhost",inOrchestratorPort=80):
|
||||
lResult=False
|
||||
lResponse=DataRecieve(inKeyList,inOrchestratorHost,inOrchestratorPort)
|
||||
if lResponse is not None:
|
||||
lResult = lResponse
|
||||
return lResult
|
||||
################################################################################
|
||||
#Reset SafeStop signal in orchestrator
|
||||
#Example: t=IntegrationOrchestrator.SafeStopSignalReset(["Storage","Robot_R01","SafeStopSignal"],"localhost",8081)
|
||||
def SafeStopSignalReset(inKeyList,inOrchestratorHost="localhost",inOrchestratorPort=80):
|
||||
lResponse=DataSend(inKeyList,False,inOrchestratorHost,inOrchestratorPort)
|
||||
return lResponse
|
@ -0,0 +1,385 @@
|
||||
import requests
|
||||
#Logging
|
||||
import os
|
||||
import logging
|
||||
import datetime
|
||||
import copy
|
||||
from .Utils import TimerRepeat # Timer which can repeating
|
||||
mLogger=logging.getLogger("OrchestratorConnector")
|
||||
#########################
|
||||
# Создать файл логирования
|
||||
# add filemode="w" to overwrite
|
||||
if not os.path.exists("Reports"):
|
||||
os.makedirs("Reports")
|
||||
##########################
|
||||
# Подготовка логгера Robot
|
||||
#########################
|
||||
mLogger.setLevel(logging.INFO)
|
||||
# create the logging file handler
|
||||
mLoggerFH = logging.FileHandler("Reports\ReportOrchestratorConnector_" + datetime.datetime.now().strftime("%Y_%m_%d") + ".log")
|
||||
mLoggerFormatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
mLoggerFH.setFormatter(mLoggerFormatter)
|
||||
# add handler to logger object
|
||||
mLogger.addHandler(mLoggerFH)
|
||||
############################################
|
||||
#from requests import async
|
||||
import json
|
||||
###################################
|
||||
##Orchestrator integration module (safe use when orchestrator is turned off)
|
||||
###################################
|
||||
################################################################################
|
||||
# Recieve data from orchestrator (synchronyous)
|
||||
# Example:
|
||||
# t=IntegrationOrchestrator.DataRecieveAsync(
|
||||
# RobotStorage=mGlobal["Storage"],
|
||||
# RobotStorageKey="R01_OrchestratorToRobot",
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
def DataReceiveSync(
|
||||
OrchestratorKeyList, OrchestratorProtocol="http",
|
||||
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
||||
):
|
||||
lCookies = {}
|
||||
# Set auth token if authorization is needed
|
||||
if OrchestratorAuthToken:
|
||||
lCookies["AuthToken"] = OrchestratorAuthToken
|
||||
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
||||
lDataJSON = [{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList}]
|
||||
try:
|
||||
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
||||
lResultJSON = json.loads(lResult.text)
|
||||
return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data)
|
||||
except Exception:
|
||||
mLogger.warning(
|
||||
f"Orchestrator not responding. Def DataRecieveSync, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
||||
return (False, None) # (Flag response is not ok, Data None)
|
||||
################################################################################
|
||||
# Recieve data from orchestrator (asynchronyous)
|
||||
# Example:
|
||||
# t=IntegrationOrchestrator.DataRecieveAsync(
|
||||
# RobotStorage=mGlobal["Storage"],
|
||||
# RobotStorageKey="R01_OrchestratorToRobot",
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
def DataReceiveAsync(
|
||||
RobotStorage, RobotStorageKey, OrchestratorKeyList, OrchestratorProtocol="http",
|
||||
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
||||
):
|
||||
from threading import Thread
|
||||
import uuid
|
||||
global mGlobalDict
|
||||
class ThreadAsync(Thread):
|
||||
def DataRecieveSync(self):
|
||||
lCookies = {}
|
||||
#Set auth token if authorization is needed
|
||||
if OrchestratorAuthToken:
|
||||
lCookies["AuthToken"] = OrchestratorAuthToken
|
||||
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
||||
lDataJSON = [{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList}]
|
||||
try:
|
||||
lResult = requests.post(lURL, json=lDataJSON, cookies = lCookies)
|
||||
lResultJSON = json.loads(lResult.text)
|
||||
return (True,lResultJSON[0]["Result"]) #(Flag response is ok, Data)
|
||||
except Exception:
|
||||
mLogger.warning(
|
||||
f"Orchestrator not responding. Def DataRecieveAsync, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
||||
return (False,None) #(Flag response is not ok, Data None)
|
||||
# Thread init
|
||||
def __init__(self, name):
|
||||
Thread.__init__(self)
|
||||
self.name = name
|
||||
#Thread start
|
||||
def run(self):
|
||||
(lFlagResponseOK,lResponseData) = self.DataRecieveSync()
|
||||
if lFlagResponseOK:
|
||||
RobotStorage[RobotStorageKey] = lResponseData
|
||||
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
||||
ThreadObject.start()
|
||||
return True
|
||||
################################################################################
|
||||
#IntervalDataRecieveAsync - Periodic recieve data from orchestrator and update storage
|
||||
def IntervalDataReceiveAsync(*args, **kwargs):
|
||||
lInterval=3
|
||||
#Delete index 0 from args
|
||||
lArgs=copy.copy(args)
|
||||
if len(lArgs)>0:
|
||||
lInterval = lArgs[0]
|
||||
lArgs = lArgs[1:]
|
||||
#Delete Interval from kwargs
|
||||
lKwargs = copy.copy(kwargs)
|
||||
if "Interval" in lKwargs:
|
||||
lInterval = lKwargs["Interval"]
|
||||
del lKwargs["Interval"]
|
||||
lTimer = TimerRepeat.TimerRepeat(lInterval, DataReceiveAsync, lArgs, lKwargs)
|
||||
lTimer.start()
|
||||
return lTimer
|
||||
################################################################################
|
||||
###################################
|
||||
################################
|
||||
###################################
|
||||
################################################################################
|
||||
# Send data from orchestrator (synchronyous)
|
||||
# Example:
|
||||
# t=IntegrationOrchestrator.DataSendSync(
|
||||
# RobotValue="Value",
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
def DataSendSync(
|
||||
RobotValue, OrchestratorKeyList, OrchestratorProtocol="http",
|
||||
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
||||
):
|
||||
lCookies = {}
|
||||
# Set auth token if authorization is needed
|
||||
if OrchestratorAuthToken:
|
||||
lCookies["AuthToken"] = OrchestratorAuthToken
|
||||
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
||||
lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}]
|
||||
try:
|
||||
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
||||
lResultJSON = json.loads(lResult.text)
|
||||
return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data)
|
||||
except Exception:
|
||||
mLogger.warning(
|
||||
f"Orchestrator not responding. Def: DataSendSync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
||||
return (False, None) # (Flag response is not ok, Data None)
|
||||
################################################################################
|
||||
# Send data from orchestrator (asynchronyous)
|
||||
# Example:
|
||||
# t=IntegrationOrchestrator.DataSendAsync(
|
||||
# RobotStorage=mGlobal["Storage"],
|
||||
# RobotStorageKey="R01_OrchestratorToRobot",
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
def DataSendAsync(
|
||||
RobotStorage, RobotStorageKey, OrchestratorKeyList, OrchestratorProtocol="http",
|
||||
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
||||
):
|
||||
from threading import Thread
|
||||
import uuid
|
||||
global mGlobalDict
|
||||
class ThreadAsync(Thread):
|
||||
def DataSendSync(self):
|
||||
RobotValue = RobotStorage[RobotStorageKey]
|
||||
lCookies = {}
|
||||
# Set auth token if authorization is needed
|
||||
if OrchestratorAuthToken:
|
||||
lCookies["AuthToken"] = OrchestratorAuthToken
|
||||
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
||||
lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}]
|
||||
try:
|
||||
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
||||
lResultJSON = json.loads(lResult.text)
|
||||
return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data)
|
||||
except Exception:
|
||||
mLogger.warning(
|
||||
f"Orchestrator not responding. Def: DataSendAsync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
||||
return (False, None) # (Flag response is not ok, Data None)
|
||||
# Thread init
|
||||
def __init__(self, name):
|
||||
Thread.__init__(self)
|
||||
self.name = name
|
||||
#Thread start
|
||||
def run(self):
|
||||
self.DataSendSync()
|
||||
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
||||
ThreadObject.start()
|
||||
return True
|
||||
################################################################################
|
||||
#IntervalDataSendAsync - Periodic send data from robot to orchestrator
|
||||
def IntervalDataSendAsync(*args,**kwargs):
|
||||
lInterval=3
|
||||
#Delete index 0 from args
|
||||
lArgs=copy.copy(args)
|
||||
if len(lArgs)>0:
|
||||
lInterval = lArgs[0]
|
||||
lArgs = lArgs[1:]
|
||||
#Delete Interval from kwargs
|
||||
lKwargs = copy.copy(kwargs)
|
||||
if "Interval" in lKwargs:
|
||||
lInterval = lKwargs["Interval"]
|
||||
del lKwargs["Interval"]
|
||||
lTimer = TimerRepeat.TimerRepeat(lInterval, DataSendAsync, lArgs, lKwargs)
|
||||
lTimer.start()
|
||||
return lTimer
|
||||
################################################################################
|
||||
###################################
|
||||
################################
|
||||
###################################
|
||||
################################################################################
|
||||
# Check if RobotStorage[Key] Value has been changed > then send data + reset to orchestrator (asynchronyous) timeout 2 seconds
|
||||
# Example:
|
||||
# t=IntegrationOrchestrator.DataSendResetAsync(
|
||||
# RobotStorage=mGlobal["Storage"],
|
||||
# RobotStorageKey="R01_OrchestratorToRobot",
|
||||
# RobotResetValue="Test",
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
def DataSendResetAsync(
|
||||
RobotStorage, RobotStorageKey, RobotResetValue, OrchestratorKeyList, OrchestratorProtocol="http",
|
||||
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
||||
):
|
||||
#Do operations if data not equal to ResetValue
|
||||
if RobotStorage[RobotStorageKey] != RobotResetValue:
|
||||
#Get value
|
||||
lRobotValue = copy.deepcopy(RobotStorage[RobotStorageKey])
|
||||
#Reset value
|
||||
RobotStorage[RobotStorageKey] = copy.deepcopy(RobotResetValue)
|
||||
#Send data (retry while data will be transferred completele)
|
||||
from threading import Thread
|
||||
import uuid
|
||||
import time
|
||||
global mGlobalDict
|
||||
class ThreadAsync(Thread):
|
||||
def DataSendSync(self):
|
||||
RobotValue = lRobotValue
|
||||
lCookies = {}
|
||||
# Set auth token if authorization is needed
|
||||
if OrchestratorAuthToken:
|
||||
lCookies["AuthToken"] = OrchestratorAuthToken
|
||||
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
||||
lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}]
|
||||
lFlagDataTransmit = False
|
||||
while not lFlagDataTransmit:
|
||||
try:
|
||||
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
||||
lResultJSON = json.loads(lResult.text)
|
||||
lFlagDataTransmit = True
|
||||
except Exception:
|
||||
mLogger.warning(
|
||||
f"Orchestrator not responding - will retry to send update. Timeout 2 seconds. Def: DataSendResetAsync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
||||
time.sleep(2) #Timout for next loop
|
||||
return (True,True) # Only True can be returned
|
||||
# Thread init
|
||||
def __init__(self, name):
|
||||
Thread.__init__(self)
|
||||
self.name = name
|
||||
# Thread start
|
||||
def run(self):
|
||||
self.DataSendSync()
|
||||
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
||||
ThreadObject.start()
|
||||
return True
|
||||
return True
|
||||
################################################################################
|
||||
################################################################################
|
||||
#IntervalDataSendResetAsync - Periodic check changed and send + reset data from robot to orchestrator
|
||||
def IntervalDataSendResetAsync(*args,**kwargs):
|
||||
lInterval=3
|
||||
#Delete index 0 from args
|
||||
lArgs=copy.copy(args)
|
||||
if len(lArgs)>0:
|
||||
lInterval = lArgs[0]
|
||||
lArgs = lArgs[1:]
|
||||
#Delete Interval from kwargs
|
||||
lKwargs = copy.copy(kwargs)
|
||||
if "Interval" in lKwargs:
|
||||
lInterval = lKwargs["Interval"]
|
||||
del lKwargs["Interval"]
|
||||
lTimer = TimerRepeat.TimerRepeat(lInterval, DataSendResetAsync, lArgs, lKwargs)
|
||||
lTimer.start()
|
||||
return lTimer
|
||||
################################################################################
|
||||
# Check changes in orchestrator - then replace in RobotStorage if not equeal. Has no timeout because You can use function IntervalDataReceiveResetAsync (asynchronyous)
|
||||
#Next iteration do not rewrite value until new change has come from orchestrator
|
||||
# Example:
|
||||
# t=IntegrationOrchestrator.DataRecieveAsync(
|
||||
# RobotStorage=mGlobal["Storage"],
|
||||
# RobotStorageKey="R01_OrchestratorToRobot",
|
||||
# RobotResetValue={"Test":"Test"},
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
def DataReceiveResetAsync(
|
||||
RobotStorage, RobotStorageKey, RobotResetValue, OrchestratorKeyList, OrchestratorProtocol="http",
|
||||
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
||||
):
|
||||
from threading import Thread
|
||||
import uuid
|
||||
global mGlobalDict
|
||||
class ThreadAsync(Thread):
|
||||
def DataRecieveSync(self):
|
||||
lCookies = {}
|
||||
#Set auth token if authorization is needed
|
||||
if OrchestratorAuthToken:
|
||||
lCookies["AuthToken"] = OrchestratorAuthToken
|
||||
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
||||
lDataJSON = [
|
||||
{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList},
|
||||
{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotResetValue}
|
||||
]
|
||||
try:
|
||||
lResult = requests.post(lURL, json=lDataJSON, cookies = lCookies)
|
||||
lResultJSON = json.loads(lResult.text)
|
||||
#Change data if it changes with ResetValue
|
||||
if lResultJSON[0]["Result"] != RobotResetValue:
|
||||
return (True,lResultJSON[0]["Result"]) #(Flag data changes is ok, Data)
|
||||
else:
|
||||
return (False, lResultJSON[0]["Result"]) # (Flag data changes is false - dont rewrite in RobotStorage, Data)
|
||||
except Exception:
|
||||
mLogger.warning(
|
||||
f"Orchestrator not responding. Def DataReceiveResetAsync, RobotResetValue: {str(RobotResetValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
||||
return (False,None) #(Flag response is not ok, Data None)
|
||||
# Thread init
|
||||
def __init__(self, name):
|
||||
Thread.__init__(self)
|
||||
self.name = name
|
||||
#Thread start
|
||||
def run(self):
|
||||
(lFlagResponseOK,lResponseData) = self.DataRecieveSync()
|
||||
if lFlagResponseOK:
|
||||
RobotStorage[RobotStorageKey] = lResponseData
|
||||
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
||||
ThreadObject.start()
|
||||
return True
|
||||
################################################################################
|
||||
################################################################################
|
||||
#IntervalDataReceiveResetAsync - Periodic receive + every time reset and check changed and reset data on robot storage
|
||||
def IntervalDataReceiveResetAsync(*args,**kwargs):
|
||||
lInterval=3
|
||||
#Delete index 0 from args
|
||||
lArgs=copy.copy(args)
|
||||
if len(lArgs)>0:
|
||||
lInterval = lArgs[0]
|
||||
lArgs = lArgs[1:]
|
||||
#Delete Interval from kwargs
|
||||
lKwargs = copy.copy(kwargs)
|
||||
if "Interval" in lKwargs:
|
||||
lInterval = lKwargs["Interval"]
|
||||
del lKwargs["Interval"]
|
||||
lTimer = TimerRepeat.TimerRepeat(lInterval, DataReceiveResetAsync, lArgs, lKwargs)
|
||||
lTimer.start()
|
||||
return lTimer
|
||||
#################################################################################
|
||||
#################################################################################
|
||||
################################################################################
|
||||
#ConfigurationInit - Get dict configuration and init interval functions
|
||||
def ConfigurationInit(inConfigurationDict):
|
||||
for lItem in inConfigurationDict.keys():
|
||||
lFunction = globals()[lItem]
|
||||
#Iterate throught the nested list
|
||||
for lFunctionConfigurationDict in inConfigurationDict[lItem]:
|
||||
lFunction(**lFunctionConfigurationDict)
|
||||
return True
|
@ -0,0 +1,65 @@
|
||||
import unittest
|
||||
from threading import Timer
|
||||
import sys
|
||||
lFolderPath = "/".join(__file__.split("\\")[:-3])
|
||||
sys.path.insert(0, lFolderPath)
|
||||
from pyOpenRPA.Robot import OrchestratorConnector
|
||||
from pyOpenRPA.Robot import Utils
|
||||
class MyTestCase(unittest.TestCase):
|
||||
def test_something(self):
|
||||
#self.assertEqual(True, False)
|
||||
mGlobal={"Storage":{"R01_OrchestratorToRobot":{"Test":"Test2"}}}
|
||||
# t=OrchestratorConnector.IntervalDataSendAsync(
|
||||
# Interval=1,
|
||||
# RobotStorage=mGlobal["Storage"],
|
||||
# RobotStorageKey="R01_OrchestratorToRobot",
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
# t=OrchestratorConnector.DataSendSync(
|
||||
# RobotValue="Test",
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
import time
|
||||
#def Func(lT,inl):
|
||||
# print(lT)
|
||||
# return True
|
||||
#lTimer= Utils.TimerRepeat.TimerRepeat(1, Func, ["dddd"],{"inl":9})
|
||||
#lTimer.start()
|
||||
OrchestratorConnector.ConfigurationInit({
|
||||
"IntervalDataSendResetAsync": [
|
||||
{
|
||||
"Interval": 2,
|
||||
"RobotStorage": mGlobal["Storage"],
|
||||
"RobotStorageKey": "R01_OrchestratorToRobot",
|
||||
"RobotResetValue": {"Test": "Test"},
|
||||
"OrchestratorKeyList": ["Storage", "R01_OrchestratorToRobot"],
|
||||
"OrchestratorProtocol": "http",
|
||||
"OrchestratorHost": "localhost",
|
||||
"OrchestratorPort": 8081,
|
||||
"OrchestratorAuthToken": "1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
}
|
||||
]
|
||||
})
|
||||
while True:
|
||||
print(mGlobal["Storage"]["R01_OrchestratorToRobot"])
|
||||
# t = OrchestratorConnector.DataSendResetAsync(
|
||||
# RobotStorage=mGlobal["Storage"],
|
||||
# RobotStorageKey="R01_OrchestratorToRobot",
|
||||
# RobotResetValue={"Test": "Test"},
|
||||
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
||||
# OrchestratorProtocol="http",
|
||||
# OrchestratorHost="localhost",
|
||||
# OrchestratorPort=8081,
|
||||
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
||||
# )
|
||||
time.sleep(0.5)
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -0,0 +1,28 @@
|
||||
from threading import Timer
|
||||
import datetime
|
||||
# lTimer = RepeatedTimer(3, def, [], {}) # it auto-starts, no need of rt.start()
|
||||
# if def return None = timer stops
|
||||
# lTimer.start()
|
||||
class TimerRepeat(object):
|
||||
def __init__(self, interval, function, args, kwargs):
|
||||
self._timer = None
|
||||
self.interval = interval
|
||||
self.function = function
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.is_running = False
|
||||
self.start()
|
||||
def _run(self):
|
||||
self.is_running = False
|
||||
lResult = self.function(*self.args, **self.kwargs)
|
||||
if lResult is not None:
|
||||
if lResult:
|
||||
self.start()
|
||||
def start(self):
|
||||
if not self.is_running:
|
||||
self._timer = Timer(self.interval, self._run)
|
||||
self._timer.start()
|
||||
self.is_running = True
|
||||
def stop(self):
|
||||
self._timer.cancel()
|
||||
self.is_running = False
|
Loading…
Reference in new issue