DEV Community

Cover image for Design Pattern in Python (3): State Pattern
Z. QIU
Z. QIU

Posted on

Design Pattern in Python (3): State Pattern

This post is one of my "Design Pattern" post collection.

Introduction

In State pattern a class behavior changes based on its state. This type of design pattern comes under behavior pattern. In State pattern, we create objects which represent various states and a context object whose behavior varies as its state object changes.

Alt Text

Today I do some coding work as an exercise on State Pattern in Python.

State examples

States concept is everywhere in this world. For example, I am now in "working state" for this moment, and one hour ago I was in "playing state" (watching Netflix). Another example, water stays in "Liquid State" under room temperature, however it will transit to "Solid State" or "Gas State" when temperature goer higher than 100 deg or lower than 0 deg.

Alt Text

In industrial domain, States concept is also broadly adopted in many technologies. I used to work on medical device development, I thus give two examples in this domain.

Task states in VxWorks RTOS:

Alt Text

CanOpen devices conforming to CanOpen protocol (CiA DS301):
Alt Text

My implementation

I would like to simulate state behaviors of a CANOpen communication node (CiA DS301). As seen in the figure above, it has 4 states:
Initialization, PreOperational, Operational and Stopped. My code below is supposed to simulate ONLY some transitions between its states. In fact, a real CANOpen device is much more complicated than my simulation (see protocol).

Now time to code.

Package & singleton preparation

Firstly import some useful packages and define singleton decorator.

from abc import ABCMeta, abstractmethod
import threading, time

### Singleton Decorator method
def singleton(cls, *args, **kwargs):
    __instance = {}

    def __singleton(*args, **kwargs):

        if cls not in __instance:
            __instance[cls] = cls(*args, **kwargs)
        else:
            pass

        return __instance[cls]
    return __singleton
Enter fullscreen mode Exit fullscreen mode

Context class

Context class is designed to possess a list of its states. It should has one current state at each moment in its life cycle. This class should implement a function for transition of state. One can also add state-dependent behaviors. In my case, I defined entryBehavior() and exitBehavior() to call a state's behaviors upon its entry and exit.

class Context():
   """Context base class"""
    def __init__(self, ContextName):

        self.__states = {}
        self.__currentState = None    # state name     
        self.__name = ContextName

    def addState(self,state):
        self.__states[state.getName()] = state
        print("States: ",  self.__states.keys())

    def setState(self, stateName):
        if stateName in self.__states :
            self.__currentState = self.__states[stateName]
        else:
            print("Error: unknown state: {}".format(stateName))

    def getState(self):
        return self.__currentState

    def getContextName(self):
        return self.__name   

    ## message driven transition
    def doTransition(self, msg):
        current = self.__currentState.getName()
        if msg["from"] == current:
            print("Transition from {} to {}".format(msg["from"], msg["to"]))
            self.exitBehavior( self.__states[msg["from"]])
            self.setState(msg["to"])
            self.entryBehavior(  self.__states[msg["to"]])    
        else:
            print("Error: Current State is {}, received transition from {} to {}".format(current, msg["from"], msg["to"]))

    ## Behavior upon entry of a new state
    def entryBehavior(self, toState):        
        if (isinstance(toState,State)):
            toState.onEntryBehavior(self)

    ## Behavior upon exit of present state
    def exitBehavior(self, fromState):        
        if (isinstance(fromState,State)):
            fromState.onExitBehavior(self)
Enter fullscreen mode Exit fullscreen mode

State class

State class is quite simple. It has a private name member. I defined two abstractmethods onExitBehavior() andonExitBehavior() which shall be implemented in derived state classes.

class State(metaclass=ABCMeta):
    """State base class"""

    def __init__(self, name):
        self.__name = name

    def getName(self):
        return self.__name

    @abstractmethod
    def onEntryBehavior(self, CANOpen_Node):
        pass 

    @abstractmethod
    def onExitBehavior(self, CANOpen_Node):
        pass 
Enter fullscreen mode Exit fullscreen mode

Concrete state class

As one example of Concrete state class, I show in this part code of InitializationStateclass which is a derived class from State class. Instance of this state shall call onEntryBehavior() and "start dispatching heartbeat msgs" upon entry of state. And upon exit, it calls onExitBehavior() to "stop dispatching heartbeat msgs".

@singleton
class InitializationState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))

    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching  heartbeat msg of state - {}".format(nn, sn))
Enter fullscreen mode Exit fullscreen mode

Simulation

To realize a full simulation, I introduced a CANOpen_Node class which is a concrete Context class. Its instance named "Node_Lidar_2020" is defined in main function. Its possesses instances of all the 4 concrete State classes. I define also a series of msgs of transitions and their arrival time. Below is my simulation code.

Code of CANOpen_Node class:


class CANOpen_Node(Context):
    """
    Simulated CANOpen 301 Node class
    """

    def __init__(self, ContextName):
        super().__init__(ContextName)

        self.addState(InitializationState("State_Initialization"))        
        self.addState(PreOperationalState("State_PreOperational"))        
        self.addState(OperationalState("State_Operational"))
        self.addState(StoppedState("State_Stopped"))
        self.__active = False
        self.__thread = threading.Thread(target=self.communication)
        self.__timer = 0


    def PowerOn(self):

        if  self.__thread.isAlive():            
            pass
        else:
            self.__active = True
            print("Power is ON!")
            self.setState("State_Initialization")
            print("Automatically entering State_Initialization!")
            self.__thread.start()


    def PowerOff(self):
        self.__active = False
        print("Calling Power Off!")

    def communication(self):
        while self.__active:
            print("[HeartBeat] Node {} is in {}".format(self.getContextName(), self.getState().getName()))
            time.sleep(1)
            self.__timer+=1
        print("Power is Off!")

Enter fullscreen mode Exit fullscreen mode

The other 3 concrete State classes:


@singleton           
class PreOperationalState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching  heartbeat msg of state - {}".format(nn, sn))

@singleton          
class OperationalState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching  heartbeat msg of state - {}".format(nn, sn))

@singleton          
class StoppedState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching  heartbeat msg of state - {}".format(nn, sn))

Enter fullscreen mode Exit fullscreen mode

Now launch the simulation:

## State transition msgs
msg12 = {"from": "State_Initialization", "to": "State_PreOperational"}
msg21 = {"from": "State_PreOperational", "to": "State_Initialization"}
msg23 = {"from": "State_PreOperational", "to": "State_Operational"}
msg32 = {"from": "State_Operational", "to": "State_PreOperational"}
msg34 = {"from": "State_Operational", "to": "State_Stopped"}
msg43 = {"from": "State_Stopped", "to": "State_Operational"}
msg42 = {"from": "State_Stopped", "to": "State_PreOperational"}
msg24 = {"from": "State_PreOperational", "to": "State_Stopped"}

## simulate a ring buffer for msgs 
msgQ = [(msg12, 6), (msg23, 7), (msg32, 22), (msg34, 25),(msg24, 28)]

if __name__ == "__main__":

    Simulated_Node = CANOpen_Node("Node_Lidar_2020")
    step = 0
    buffer_head = 0
    while (True):

        if step == 3:
            Simulated_Node.PowerOn()
        if step == 30:
            Simulated_Node.PowerOff()
            break

        if step == msgQ[buffer_head][1]:
            Simulated_Node.doTransition(msgQ[buffer_head][0])
            buffer_head+=1
            if buffer_head ==  len(msgQ):
                buffer_head = 0


        time.sleep(1)
        step+=1
        print("========= step {} =========".format(step))
Enter fullscreen mode Exit fullscreen mode

Execution output:

Alt Text

As one can see:

  • at step #3, node is powered on and it enters autonomously "Initialization" state. It starts sending HeartBeat signal of its current state.
  • at step #6, it makes transition from State_Initialization to State_PreOperational
  • at step #7, it makes transition from State_PreOperational to State_Operational
  • at step #22, it makes transition from State_Operational to State_PreOperational
  • at step #25, it receives msg for transition from State_Operational to State_Stopped, since its current state is not consistent with this transition, it does not transit its state
  • at step #28, it makes transition from State_PreOperational to State_Stopped
  • at step #30, node is powered off

Top comments (1)

Collapse
 
paulohsgoes profile image
paulohsgoes

Awesome! I'd like to point out a little change in the following line of code

from: if self.thread.isAlive():
to: if self.
thread.is_alive():