# 1.2.0 - general processor - contains old orchestrator processor + RDPActive processor import time, copy # Run processor synchronious def ProcessorRunSync(inGSettings): """ "ProcessorDict": { # Has been changed. New general processor (one threaded) v.1.2.0 "ActivityList": [ # List of the activities # { # "Def":"DefAliasTest", # def link or def alias (look gSettings["Processor"]["AliasDefDict"]) # "ArgList":[1,2,3], # Args list # "ArgDict":{"ttt":1,"222":2,"dsd":3}, # Args dictionary # "ArgGSettings": None # Name of GSettings attribute: str (ArgDict) or index (for ArgList) # }, ], "AliasDefDict": {}, # Storage for def with Str alias. To use it see pyOpenRPA.Orchestrator.ControlPanel "CheckIntervalSecFloat": 1.0 # Interval for check gSettings in ProcessorDict > ActivityList "ExecuteBool": True # Flag to execute thread processor """ while inGSettings["ProcessorDict"]["ExecuteBool"]: lActivityItem = inGSettings["ProcessorDict"]["ActivityList"].pop(0, None) # Extract the first item from processor queue while lActivityItem is not None: ActivityListExecute(inGSettings = inGSettings, inActivityList = [lActivityItem]) # execute the activity item time.sleep(inGSettings["ProcessorDict"]["CheckIntervalSecFloat"]) # Sleep when list is empty # Execute ActivityItem list # return the def result def ActivityListExecute(inGSettings, inActivityList): lL = inGSettings["Logger"] # Logger alias lResultList = [] # init the result list for lActivityItem in inActivityList: # Iterate throught the activity list lDef = None # Def variable if callable(lActivityItem["Def"]): # CHeck if def is callable lDef = lActivityItem["Def"] # Get the def else: # Is not callable - check alias lDef = inGSettings["ProcessorDict"]["AliasDefDict"].pop(lActivityItem["Def"], None) # get def if def key in Alias def storage # lGSettingsDictKey = lActivityItem.pop("ArgGSettings",None) # # Prepare arg dict if type(lGSettingsDictKey) is str: # check if gSetting key is in ArgDict lActivityItem["ArgDict"]["lGSettingsDictKey"] = inGSettings # Set the gSettings in dict # # Prepare arg list elif type(lGSettingsDictKey) is int: # check if gSetting key is in ArgDict lActivityItem["ArgList"].insert(lGSettingsDictKey,inGSettings)# Set the gSettings in list by the index try: # try to run function from Processor.py lActivityItemResult = lDef(*lActivityItem["ArgList"], **lActivityItem["ArgDict"]) lResultList.append(lActivityItemResult) # return the result except Exception as e: if lL: lL.exception(f"Processor.ActivityListExecute: Exception in def execution - activity will be ignored. Activity item: {lActivityItem}") # Logging lResultList.append(e) # return the generated exception return lResultList # return the result list