Add general defs to work with pyOpenRPA.Orchestrator.Managers.Process via Process key (ProcessNameWOEXE) - need test

dev-linux
Ivan Maslov 3 years ago
parent a41a0a0462
commit 6026c771ae

@ -506,4 +506,9 @@ def Update(inGSettings):
if "ControlPanelDict" not in inGSettings["ServerDict"]:
inGSettings["ServerDict"]["ControlPanelDict"]={}
if lL: lL.warning(
f"Backward compatibility (v1.2.4 to v1.2.7): Create new key: ServerDict > ControlPanelDict") # Log about compatibility
f"Backward compatibility (v1.2.4 to v1.2.7): Create new key: ServerDict > ControlPanelDict") # Log about compatibility
# ManagersProcessDict
if "ManagersProcessDict" not in inGSettings:
inGSettings["ManagersProcessDict"]={}
if lL: lL.warning(
f"Backward compatibility (v1.2.4 to v1.2.7): Create new key: ManagersProcessDict") # Log about compatibility

@ -1,4 +1,4 @@
#from pyOpenRPA.Orchestrator import Managers
from .. import __Orchestrator__
import os
@ -32,6 +32,7 @@ class Process():
self.mStartCMDStr = inStartCMDStr
self.mProcessNameWOExeStr = inProcessNameWOExeStr
self.mStopSafeTimeoutSecFloat = inStopSafeTimeoutSecFloat
__Orchestrator__.GSettingsGet()["ManagersProcessDict"][inProcessNameWOExeStr.upper()]=self
def Manual2Auto(self):
"""
@ -170,4 +171,95 @@ class Process():
:return:
"""
pass
pass
def ProcessGet(inProcessNameWOExeStr: str) -> Process:
"""
Return the process instance by the inProcessNameWOExeStr
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:return: Process instance (if exists) Else None
"""
return __Orchestrator__.GSettingsGet()["ManagersProcessDict"].get(inProcessNameWOExeStr.upper(),None)
def ProcessStatusStrGet(inProcessNameWOExeStr):
"""
Get the status of the Process instance.
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:return: Str
Process instance has the following statuses:
- 0_STOPPED
- 1_STOPPED_MANUAL
- 2_STOP_SAFE
- 3_STOP_SAFE_MANUAL
- 4_STARTED
- 5_STARTED_MANUAL
- None (if Process instance not exists)
"""
lProcess = ProcessGet(inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None: return lProcess.mStatusStr
else: return None
def ProcessStart(inProcessNameWOExeStr: str, inIsManualBool: bool = True) -> None:
"""
Manual/Auto start. Manual start will block scheduling execution. To return schedule execution use def Manual2Auto
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation
:return:
"""
lProcess = ProcessGet(inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None: return lProcess.Start(inIsManualBool=inIsManualBool)
else: return None
def ProcessStopSafe(inProcessNameWOExeStr: str, inIsManualBool: bool = True) -> None:
"""
Manual/Auto stop safe. Stop safe is the operation which send signal to process to terminate own work (send term signal to process). Managers.Process wait for the mStopSafeTimeoutSecFloat seconds. After that, if process is not terminated - self will StopForce it.
Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation
:return:
"""
lProcess = ProcessGet(inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None: return lProcess.StopSafe(inIsManualBool=inIsManualBool)
else: return None
def ProcessStopForce(inProcessNameWOExeStr: str, inIsManualBool: bool = True) -> None:
"""
Manual/Auto stop force. Force stop dont wait process termination - it just terminate process now.
Manual stop safe will block scheduling execution. To return schedule execution use def Manual2Auto
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:param inIsManualBool: Default is True - Mark this operation as manual - StatusCheckStart/Stop will be blocked - only StatusCheck will be working. False - Auto operation
:return:
"""
lProcess = ProcessGet(inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None: return lProcess.StopForce(inIsManualBool=inIsManualBool)
else: return None
def ProcessStatusCheck(inProcessNameWOExeStr: str) -> str:
"""
Check if process is alive. The def will save the manual flag is exists.
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:return: str: Check process status and return it
"""
lProcess = ProcessGet(inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None:
lProcess.StatusCheck()
return lProcess.mStatusStr
else: return None
def ProcessManual2Auto(inProcessNameWOExeStr: str) -> None:
"""
Remove Manual flag from process (if exists) - it will allow the schedule operations via def StatusCheckStart(self): def StatusCheckStorForce(self): def StatusCheckStopSafe(self):
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:return: None
"""
lProcess = ProcessGet(inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None: lProcess.Manual2Auto()

@ -148,6 +148,7 @@ def __Create__():
# },
],
},
"ManagersProcessDict":{}, # The key of the Process is mProcessNameWOExeStr.upper()
"ProcessorDict": { # Has been changed. New general processor (one threaded) v.1.2.0
"ActivityList": [ # List of the activities
# {

Loading…
Cancel
Save