You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
401 lines
19 KiB
401 lines
19 KiB
import requests
|
|
#Logging
|
|
import os
|
|
import logging
|
|
import datetime
|
|
import copy
|
|
from .Utils import TimerRepeat # Timer which can repeating
|
|
mLogger=logging.getLogger("OrchestratorConnector")
|
|
#########################
|
|
mTimerList=[]
|
|
def IntervalTerminateAll():
|
|
for lItem in mTimerList:
|
|
lItem.stop()
|
|
#########################
|
|
# Создать файл логирования
|
|
# add filemode="w" to overwrite
|
|
if not os.path.exists("Reports"):
|
|
os.makedirs("Reports")
|
|
##########################
|
|
# Подготовка логгера Robot
|
|
#########################
|
|
mLogger.setLevel(logging.INFO)
|
|
# create the logging file handler
|
|
mLoggerFH = logging.FileHandler("Reports\ReportOrchestratorConnector_" + datetime.datetime.now().strftime("%Y_%m_%d") + ".log")
|
|
mLoggerFormatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
mLoggerFH.setFormatter(mLoggerFormatter)
|
|
# add handler to logger object
|
|
mLogger.addHandler(mLoggerFH)
|
|
############################################
|
|
#Turn loggin level ERROR
|
|
def LoggerSetLevelError():
|
|
mLogger.setLevel(logging.ERROR)
|
|
#from requests import async
|
|
import json
|
|
###################################
|
|
##Orchestrator integration module (safe use when orchestrator is turned off)
|
|
###################################
|
|
################################################################################
|
|
# Recieve data from orchestrator (synchronyous)
|
|
# Example:
|
|
# t=IntegrationOrchestrator.DataRecieveAsync(
|
|
# RobotStorage=mGlobal["Storage"],
|
|
# RobotStorageKey="R01_OrchestratorToRobot",
|
|
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
|
# OrchestratorProtocol="http",
|
|
# OrchestratorHost="localhost",
|
|
# OrchestratorPort=8081,
|
|
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
|
# )
|
|
def DataReceiveSync(
|
|
OrchestratorKeyList, OrchestratorProtocol="http",
|
|
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
|
):
|
|
lCookies = {}
|
|
# Set auth token if authorization is needed
|
|
if OrchestratorAuthToken:
|
|
lCookies["AuthToken"] = OrchestratorAuthToken
|
|
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
|
lDataJSON = [{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList}]
|
|
try:
|
|
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
|
lResultJSON = json.loads(lResult.text)
|
|
return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data)
|
|
except Exception:
|
|
mLogger.warning(
|
|
f"Orchestrator not responding. Def DataRecieveSync, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
|
return (False, None) # (Flag response is not ok, Data None)
|
|
################################################################################
|
|
# Recieve data from orchestrator (asynchronyous)
|
|
# Example:
|
|
# t=IntegrationOrchestrator.DataRecieveAsync(
|
|
# RobotStorage=mGlobal["Storage"],
|
|
# RobotStorageKey="R01_OrchestratorToRobot",
|
|
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
|
# OrchestratorProtocol="http",
|
|
# OrchestratorHost="localhost",
|
|
# OrchestratorPort=8081,
|
|
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
|
# )
|
|
def DataReceiveAsync(
|
|
RobotStorage, RobotStorageKey, OrchestratorKeyList, OrchestratorProtocol="http",
|
|
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
|
):
|
|
from threading import Thread
|
|
import uuid
|
|
global mGlobalDict
|
|
class ThreadAsync(Thread):
|
|
def DataRecieveSync(self):
|
|
lCookies = {}
|
|
#Set auth token if authorization is needed
|
|
if OrchestratorAuthToken:
|
|
lCookies["AuthToken"] = OrchestratorAuthToken
|
|
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
|
lDataJSON = [{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList}]
|
|
try:
|
|
lResult = requests.post(lURL, json=lDataJSON, cookies = lCookies)
|
|
lResultJSON = json.loads(lResult.text)
|
|
return (True,lResultJSON[0]["Result"]) #(Flag response is ok, Data)
|
|
except Exception:
|
|
mLogger.warning(
|
|
f"Orchestrator not responding. Def DataRecieveAsync, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
|
return (False,None) #(Flag response is not ok, Data None)
|
|
# Thread init
|
|
def __init__(self, name):
|
|
Thread.__init__(self)
|
|
self.name = name
|
|
#Thread start
|
|
def run(self):
|
|
(lFlagResponseOK,lResponseData) = self.DataRecieveSync()
|
|
if lFlagResponseOK:
|
|
RobotStorage[RobotStorageKey] = lResponseData
|
|
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
|
ThreadObject.start()
|
|
return True
|
|
################################################################################
|
|
#IntervalDataRecieveAsync - Periodic recieve data from orchestrator and update storage
|
|
def IntervalDataReceiveAsync(*args, **kwargs):
|
|
lInterval=3
|
|
#Delete index 0 from args
|
|
lArgs=copy.copy(args)
|
|
if len(lArgs)>0:
|
|
lInterval = lArgs[0]
|
|
lArgs = lArgs[1:]
|
|
#Delete Interval from kwargs
|
|
lKwargs = copy.copy(kwargs)
|
|
if "Interval" in lKwargs:
|
|
lInterval = lKwargs["Interval"]
|
|
del lKwargs["Interval"]
|
|
lTimer = TimerRepeat.TimerRepeat(lInterval, DataReceiveAsync, lArgs, lKwargs)
|
|
lTimer.start()
|
|
#Add timer to general list to stop this when needed
|
|
mTimerList.append(lTimer)
|
|
return lTimer
|
|
################################################################################
|
|
###################################
|
|
################################
|
|
###################################
|
|
################################################################################
|
|
# Send data from orchestrator (synchronyous)
|
|
# Example:
|
|
# t=IntegrationOrchestrator.DataSendSync(
|
|
# RobotValue="Value",
|
|
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
|
# OrchestratorProtocol="http",
|
|
# OrchestratorHost="localhost",
|
|
# OrchestratorPort=8081,
|
|
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
|
# )
|
|
def DataSendSync(
|
|
RobotValue, OrchestratorKeyList, OrchestratorProtocol="http",
|
|
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
|
):
|
|
lCookies = {}
|
|
# Set auth token if authorization is needed
|
|
if OrchestratorAuthToken:
|
|
lCookies["AuthToken"] = OrchestratorAuthToken
|
|
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
|
lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}]
|
|
try:
|
|
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
|
lResultJSON = json.loads(lResult.text)
|
|
return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data)
|
|
except Exception:
|
|
mLogger.warning(
|
|
f"Orchestrator not responding. Def: DataSendSync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
|
return (False, None) # (Flag response is not ok, Data None)
|
|
################################################################################
|
|
# Send data from orchestrator (asynchronyous)
|
|
# Example:
|
|
# t=IntegrationOrchestrator.DataSendAsync(
|
|
# RobotStorage=mGlobal["Storage"],
|
|
# RobotStorageKey="R01_OrchestratorToRobot",
|
|
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
|
# OrchestratorProtocol="http",
|
|
# OrchestratorHost="localhost",
|
|
# OrchestratorPort=8081,
|
|
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
|
# )
|
|
def DataSendAsync(
|
|
RobotStorage, RobotStorageKey, OrchestratorKeyList, OrchestratorProtocol="http",
|
|
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
|
):
|
|
from threading import Thread
|
|
import uuid
|
|
global mGlobalDict
|
|
class ThreadAsync(Thread):
|
|
def DataSendSync(self):
|
|
RobotValue = RobotStorage[RobotStorageKey]
|
|
lCookies = {}
|
|
# Set auth token if authorization is needed
|
|
if OrchestratorAuthToken:
|
|
lCookies["AuthToken"] = OrchestratorAuthToken
|
|
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
|
lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}]
|
|
try:
|
|
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
|
lResultJSON = json.loads(lResult.text)
|
|
return (True, lResultJSON[0]["Result"]) # (Flag response is ok, Data)
|
|
except Exception:
|
|
mLogger.warning(
|
|
f"Orchestrator not responding. Def: DataSendAsync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
|
return (False, None) # (Flag response is not ok, Data None)
|
|
# Thread init
|
|
def __init__(self, name):
|
|
Thread.__init__(self)
|
|
self.name = name
|
|
#Thread start
|
|
def run(self):
|
|
self.DataSendSync()
|
|
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
|
ThreadObject.start()
|
|
return True
|
|
################################################################################
|
|
#IntervalDataSendAsync - Periodic send data from robot to orchestrator
|
|
def IntervalDataSendAsync(*args,**kwargs):
|
|
lInterval=3
|
|
#Delete index 0 from args
|
|
lArgs=copy.copy(args)
|
|
if len(lArgs)>0:
|
|
lInterval = lArgs[0]
|
|
lArgs = lArgs[1:]
|
|
#Delete Interval from kwargs
|
|
lKwargs = copy.copy(kwargs)
|
|
if "Interval" in lKwargs:
|
|
lInterval = lKwargs["Interval"]
|
|
del lKwargs["Interval"]
|
|
lTimer = TimerRepeat.TimerRepeat(lInterval, DataSendAsync, lArgs, lKwargs)
|
|
lTimer.start()
|
|
#Add timer to general list to stop this when needed
|
|
mTimerList.append(lTimer)
|
|
return lTimer
|
|
################################################################################
|
|
###################################
|
|
################################
|
|
###################################
|
|
################################################################################
|
|
# Check if RobotStorage[Key] Value has been changed > then send data + reset to orchestrator (asynchronyous) timeout 2 seconds
|
|
# Example:
|
|
# t=IntegrationOrchestrator.DataSendResetAsync(
|
|
# RobotStorage=mGlobal["Storage"],
|
|
# RobotStorageKey="R01_OrchestratorToRobot",
|
|
# RobotResetValue="Test",
|
|
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
|
# OrchestratorProtocol="http",
|
|
# OrchestratorHost="localhost",
|
|
# OrchestratorPort=8081,
|
|
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
|
# )
|
|
def DataSendResetAsync(
|
|
RobotStorage, RobotStorageKey, RobotResetValue, OrchestratorKeyList, OrchestratorProtocol="http",
|
|
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
|
):
|
|
#Do operations if data not equal to ResetValue
|
|
if RobotStorage[RobotStorageKey] != RobotResetValue:
|
|
#Get value
|
|
lRobotValue = copy.deepcopy(RobotStorage[RobotStorageKey])
|
|
#Reset value
|
|
RobotStorage[RobotStorageKey] = copy.deepcopy(RobotResetValue)
|
|
#Send data (retry while data will be transferred completele)
|
|
from threading import Thread
|
|
import uuid
|
|
import time
|
|
global mGlobalDict
|
|
class ThreadAsync(Thread):
|
|
def DataSendSync(self):
|
|
RobotValue = lRobotValue
|
|
lCookies = {}
|
|
# Set auth token if authorization is needed
|
|
if OrchestratorAuthToken:
|
|
lCookies["AuthToken"] = OrchestratorAuthToken
|
|
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
|
lDataJSON = [{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotValue}]
|
|
lFlagDataTransmit = False
|
|
while not lFlagDataTransmit:
|
|
try:
|
|
lResult = requests.post(lURL, json=lDataJSON, cookies=lCookies)
|
|
lResultJSON = json.loads(lResult.text)
|
|
lFlagDataTransmit = True
|
|
except Exception:
|
|
mLogger.warning(
|
|
f"Orchestrator not responding - will retry to send update. Timeout 2 seconds. Def: DataSendResetAsync, RobotValue: {str(RobotValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
|
time.sleep(2) #Timout for next loop
|
|
return (True,True) # Only True can be returned
|
|
# Thread init
|
|
def __init__(self, name):
|
|
Thread.__init__(self)
|
|
self.name = name
|
|
# Thread start
|
|
def run(self):
|
|
self.DataSendSync()
|
|
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
|
ThreadObject.start()
|
|
return True
|
|
return True
|
|
################################################################################
|
|
################################################################################
|
|
#IntervalDataSendResetAsync - Periodic check changed and send + reset data from robot to orchestrator
|
|
def IntervalDataSendResetAsync(*args,**kwargs):
|
|
lInterval=3
|
|
#Delete index 0 from args
|
|
lArgs=copy.copy(args)
|
|
if len(lArgs)>0:
|
|
lInterval = lArgs[0]
|
|
lArgs = lArgs[1:]
|
|
#Delete Interval from kwargs
|
|
lKwargs = copy.copy(kwargs)
|
|
if "Interval" in lKwargs:
|
|
lInterval = lKwargs["Interval"]
|
|
del lKwargs["Interval"]
|
|
lTimer = TimerRepeat.TimerRepeat(lInterval, DataSendResetAsync, lArgs, lKwargs)
|
|
lTimer.start()
|
|
#Add timer to general list to stop this when needed
|
|
mTimerList.append(lTimer)
|
|
return lTimer
|
|
################################################################################
|
|
# Check changes in orchestrator - then replace in RobotStorage if not equeal. Has no timeout because You can use function IntervalDataReceiveResetAsync (asynchronyous)
|
|
#Next iteration do not rewrite value until new change has come from orchestrator
|
|
# Example:
|
|
# t=IntegrationOrchestrator.DataRecieveAsync(
|
|
# RobotStorage=mGlobal["Storage"],
|
|
# RobotStorageKey="R01_OrchestratorToRobot",
|
|
# RobotResetValue={"Test":"Test"},
|
|
# OrchestratorKeyList=["Storage", "R01_OrchestratorToRobot"],
|
|
# OrchestratorProtocol="http",
|
|
# OrchestratorHost="localhost",
|
|
# OrchestratorPort=8081,
|
|
# OrchestratorAuthToken="1992-04-03-0643-ru-b4ff-openrpa52zzz"
|
|
# )
|
|
def DataReceiveResetAsync(
|
|
RobotStorage, RobotStorageKey, RobotResetValue, OrchestratorKeyList, OrchestratorProtocol="http",
|
|
OrchestratorHost="localhost", OrchestratorPort=80, OrchestratorAuthToken=None
|
|
):
|
|
from threading import Thread
|
|
import uuid
|
|
global mGlobalDict
|
|
class ThreadAsync(Thread):
|
|
def DataRecieveSync(self):
|
|
lCookies = {}
|
|
#Set auth token if authorization is needed
|
|
if OrchestratorAuthToken:
|
|
lCookies["AuthToken"] = OrchestratorAuthToken
|
|
lURL = f'{OrchestratorProtocol}://{OrchestratorHost}:{OrchestratorPort}/Utils/Processor'
|
|
lDataJSON = [
|
|
{"Type": "GlobalDictKeyListValueGet", "KeyList": OrchestratorKeyList},
|
|
{"Type": "GlobalDictKeyListValueSet", "KeyList": OrchestratorKeyList, "Value": RobotResetValue}
|
|
]
|
|
try:
|
|
lResult = requests.post(lURL, json=lDataJSON, cookies = lCookies)
|
|
lResultJSON = json.loads(lResult.text)
|
|
#Change data if it changes with ResetValue
|
|
if lResultJSON[0]["Result"] != RobotResetValue:
|
|
return (True,lResultJSON[0]["Result"]) #(Flag data changes is ok, Data)
|
|
else:
|
|
return (False, lResultJSON[0]["Result"]) # (Flag data changes is false - dont rewrite in RobotStorage, Data)
|
|
except Exception:
|
|
mLogger.warning(
|
|
f"Orchestrator not responding. Def DataReceiveResetAsync, RobotResetValue: {str(RobotResetValue)}, OrchestratorKeyList: {str(OrchestratorKeyList)}, OrchestratorProtocol: {str(OrchestratorProtocol)}, OrchestratorHost: {str(OrchestratorHost)}, OrchestratorPort: {str(OrchestratorPort)}")
|
|
return (False,None) #(Flag response is not ok, Data None)
|
|
# Thread init
|
|
def __init__(self, name):
|
|
Thread.__init__(self)
|
|
self.name = name
|
|
#Thread start
|
|
def run(self):
|
|
(lFlagResponseOK,lResponseData) = self.DataRecieveSync()
|
|
if lFlagResponseOK:
|
|
RobotStorage[RobotStorageKey] = lResponseData
|
|
ThreadObject = ThreadAsync(f"ThreadAsync{str(uuid.uuid1())}")
|
|
ThreadObject.start()
|
|
return True
|
|
################################################################################
|
|
################################################################################
|
|
#IntervalDataReceiveResetAsync - Periodic receive + every time reset and check changed and reset data on robot storage
|
|
def IntervalDataReceiveResetAsync(*args,**kwargs):
|
|
lInterval=3
|
|
#Delete index 0 from args
|
|
lArgs=copy.copy(args)
|
|
if len(lArgs)>0:
|
|
lInterval = lArgs[0]
|
|
lArgs = lArgs[1:]
|
|
#Delete Interval from kwargs
|
|
lKwargs = copy.copy(kwargs)
|
|
if "Interval" in lKwargs:
|
|
lInterval = lKwargs["Interval"]
|
|
del lKwargs["Interval"]
|
|
lTimer = TimerRepeat.TimerRepeat(lInterval, DataReceiveResetAsync, lArgs, lKwargs)
|
|
lTimer.start()
|
|
#Add timer to general list to stop this when needed
|
|
mTimerList.append(lTimer)
|
|
return lTimer
|
|
#################################################################################
|
|
#################################################################################
|
|
################################################################################
|
|
#ConfigurationInit - Get dict configuration and init interval functions
|
|
def ConfigurationInit(inConfigurationDict):
|
|
for lItem in inConfigurationDict.keys():
|
|
lFunction = globals()[lItem]
|
|
#Iterate throught the nested list
|
|
for lFunctionConfigurationDict in inConfigurationDict[lItem]:
|
|
lFunction(**lFunctionConfigurationDict)
|
|
return True |