This post is one of my "Design Pattern" post collection.
Introduction
In State pattern a class behavior changes based on its state. This type of design pattern comes under behavior pattern. In State pattern, we create objects which represent various states and a context object whose behavior varies as its state object changes.
Today I do some coding work as an exercise on State Pattern in Python.
State examples
States concept is everywhere in this world. For example, I am now in "working state" for this moment, and one hour ago I was in "playing state" (watching Netflix). Another example, water stays in "Liquid State" under room temperature, however it will transit to "Solid State" or "Gas State" when temperature goer higher than 100 deg or lower than 0 deg.
In industrial domain, States concept is also broadly adopted in many technologies. I used to work on medical device development, I thus give two examples in this domain.
Task states in VxWorks RTOS:
CanOpen devices conforming to CanOpen protocol (CiA DS301):
My implementation
I would like to simulate state behaviors of a CANOpen communication node (CiA DS301). As seen in the figure above, it has 4 states:
Initialization, PreOperational, Operational and Stopped. My code below is supposed to simulate ONLY some transitions between its states. In fact, a real CANOpen device is much more complicated than my simulation (see protocol).
Now time to code.
Package & singleton preparation
Firstly import some useful packages and define singleton decorator.
from abc import ABCMeta, abstractmethod
import threading, time
### Singleton Decorator method
def singleton(cls, *args, **kwargs):
__instance = {}
def __singleton(*args, **kwargs):
if cls not in __instance:
__instance[cls] = cls(*args, **kwargs)
else:
pass
return __instance[cls]
return __singleton
Context class
Context
class is designed to possess a list of its states. It should has one current state at each moment in its life cycle. This class should implement a function for transition of state. One can also add state-dependent behaviors. In my case, I defined entryBehavior()
and exitBehavior()
to call a state's behaviors upon its entry and exit.
class Context():
"""Context base class"""
def __init__(self, ContextName):
self.__states = {}
self.__currentState = None # state name
self.__name = ContextName
def addState(self,state):
self.__states[state.getName()] = state
print("States: ", self.__states.keys())
def setState(self, stateName):
if stateName in self.__states :
self.__currentState = self.__states[stateName]
else:
print("Error: unknown state: {}".format(stateName))
def getState(self):
return self.__currentState
def getContextName(self):
return self.__name
## message driven transition
def doTransition(self, msg):
current = self.__currentState.getName()
if msg["from"] == current:
print("Transition from {} to {}".format(msg["from"], msg["to"]))
self.exitBehavior( self.__states[msg["from"]])
self.setState(msg["to"])
self.entryBehavior( self.__states[msg["to"]])
else:
print("Error: Current State is {}, received transition from {} to {}".format(current, msg["from"], msg["to"]))
## Behavior upon entry of a new state
def entryBehavior(self, toState):
if (isinstance(toState,State)):
toState.onEntryBehavior(self)
## Behavior upon exit of present state
def exitBehavior(self, fromState):
if (isinstance(fromState,State)):
fromState.onExitBehavior(self)
State class
State
class is quite simple. It has a private name
member. I defined two abstractmethods onExitBehavior()
andonExitBehavior()
which shall be implemented in derived state classes.
class State(metaclass=ABCMeta):
"""State base class"""
def __init__(self, name):
self.__name = name
def getName(self):
return self.__name
@abstractmethod
def onEntryBehavior(self, CANOpen_Node):
pass
@abstractmethod
def onExitBehavior(self, CANOpen_Node):
pass
Concrete state class
As one example of Concrete state class, I show in this part code of InitializationState
class which is a derived class from State
class. Instance of this state shall call onEntryBehavior()
and "start dispatching heartbeat msgs" upon entry of state. And upon exit, it calls onExitBehavior()
to "stop dispatching heartbeat msgs".
@singleton
class InitializationState(State):
def __init__(self, name):
super().__init__(name)
def onEntryBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
def onExitBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))
Simulation
To realize a full simulation, I introduced a CANOpen_Node
class which is a concrete Context
class. Its instance named "Node_Lidar_2020" is defined in main function. Its possesses instances of all the 4 concrete State
classes. I define also a series of msgs of transitions and their arrival time. Below is my simulation code.
Code of CANOpen_Node
class:
class CANOpen_Node(Context):
"""
Simulated CANOpen 301 Node class
"""
def __init__(self, ContextName):
super().__init__(ContextName)
self.addState(InitializationState("State_Initialization"))
self.addState(PreOperationalState("State_PreOperational"))
self.addState(OperationalState("State_Operational"))
self.addState(StoppedState("State_Stopped"))
self.__active = False
self.__thread = threading.Thread(target=self.communication)
self.__timer = 0
def PowerOn(self):
if self.__thread.isAlive():
pass
else:
self.__active = True
print("Power is ON!")
self.setState("State_Initialization")
print("Automatically entering State_Initialization!")
self.__thread.start()
def PowerOff(self):
self.__active = False
print("Calling Power Off!")
def communication(self):
while self.__active:
print("[HeartBeat] Node {} is in {}".format(self.getContextName(), self.getState().getName()))
time.sleep(1)
self.__timer+=1
print("Power is Off!")
The other 3 concrete State
classes:
@singleton
class PreOperationalState(State):
def __init__(self, name):
super().__init__(name)
def onEntryBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
def onExitBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))
@singleton
class OperationalState(State):
def __init__(self, name):
super().__init__(name)
def onEntryBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
def onExitBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))
@singleton
class StoppedState(State):
def __init__(self, name):
super().__init__(name)
def onEntryBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
def onExitBehavior(self, context):
nn = context.getContextName()
sn = self.getName()
print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))
Now launch the simulation:
## State transition msgs
msg12 = {"from": "State_Initialization", "to": "State_PreOperational"}
msg21 = {"from": "State_PreOperational", "to": "State_Initialization"}
msg23 = {"from": "State_PreOperational", "to": "State_Operational"}
msg32 = {"from": "State_Operational", "to": "State_PreOperational"}
msg34 = {"from": "State_Operational", "to": "State_Stopped"}
msg43 = {"from": "State_Stopped", "to": "State_Operational"}
msg42 = {"from": "State_Stopped", "to": "State_PreOperational"}
msg24 = {"from": "State_PreOperational", "to": "State_Stopped"}
## simulate a ring buffer for msgs
msgQ = [(msg12, 6), (msg23, 7), (msg32, 22), (msg34, 25),(msg24, 28)]
if __name__ == "__main__":
Simulated_Node = CANOpen_Node("Node_Lidar_2020")
step = 0
buffer_head = 0
while (True):
if step == 3:
Simulated_Node.PowerOn()
if step == 30:
Simulated_Node.PowerOff()
break
if step == msgQ[buffer_head][1]:
Simulated_Node.doTransition(msgQ[buffer_head][0])
buffer_head+=1
if buffer_head == len(msgQ):
buffer_head = 0
time.sleep(1)
step+=1
print("========= step {} =========".format(step))
Execution output:
As one can see:
- at step #3, node is powered on and it enters autonomously "Initialization" state. It starts sending HeartBeat signal of its current state.
- at step #6, it makes transition from State_Initialization to State_PreOperational
- at step #7, it makes transition from State_PreOperational to State_Operational
- at step #22, it makes transition from State_Operational to State_PreOperational
- at step #25, it receives msg for transition from State_Operational to State_Stopped, since its current state is not consistent with this transition, it does not transit its state
- at step #28, it makes transition from State_PreOperational to State_Stopped
- at step #30, node is powered off
Top comments (1)
Awesome! I'd like to point out a little change in the following line of code
from: if self.thread.isAlive():
to: if self.thread.is_alive():