วันพุธที่ 8 พฤษภาคม พ.ศ. 2562

Simple State Machine with Python

เกริ่นนำ

State Machine (ชื่อเต็มคือ Finite - state Machine) เป็นตัวแบบในการคิดว่า machine จะเป็นได้เพียงหนึ่งสถานะเท่านั้น ณ เวลาใด เวลาหนึ่ง ตัวอย่างที่ยกกันมาบ่อย ๆ และเห็นกันได้ในชีวิตประจำวันคือ สัญญาณไฟจราจร (เขียว เหลือง แดง)



สัญญาณไฟจะมีสถานะที่เป็นไปได้สามสถานะคือ ไป (green) , ชลอ (amber) และหยุด (red) เวลาใดเวลาหนึ่งจะเป็นได้เพียงหนึ่งสถานะเท่านั้น สามารถเขียนแผนภาพได้ดังนี้




ในความเป็นจริงสัญญาณไฟจะไม่อยู่ในสถานะใดสถานะหนึ่งตลอดไปจะต้องมีการเปลี่ยนสถานะ สิ่งที่ทำให้เกิดการเปลี่ยนสถานะอาจเป็นเวลา เจ้าหน้าที่กดเปลี่ยนสัญญาณหรืออื่นๆ ในกรณีที่ใช้เวลา ไฟสัญญาณจะเปลี่ยนไปเมื่อถึงเวลาที่กำหนด





สร้างแบบจำลอง

ในความเป็นจริงสัญญาณไฟจะต้องมีจำนวนเท่ากับจำนวนแยก เช่น สามแยกก็ต้องมี 3 ระบบที่ทำงานสอดคล้องกัน แต่ในตอนนี้จะสร้างเพียงด้านเดียวเท่านั้น งานนี้เราต้องการตัวทำงาน 4 ตัวคือ State, Device , StateMachine และ Timer

State :  เพื่อใช้เก็บข้อมูลของ State ที่เป็นไปได้ทั้งหมด

class State() :
 IDLE = 0
 GREEN = 1
 AMBER = 2
 RED = 3


Devices : เป็นตัวแทนของอุปกรณ์หรือ machine ที่จะต้องทำงานในระบบ

class Light():
 def __init__(self,gpio):
  self.gpio = gpio
  self.led = LED(gpio)
  self.state = 0
  
 def _on(self):
  if self.state == 0:
   self.led.on()
   self.state = 1
  
 def _off(self):
  if self.state == 1:
   self.led.off()
   self.state = 0

class Devices():
 def __init__(self):
  self.green_light = Light(GREEN_GPIO)
  self.amber_light = Light(AMBER_GPIO)
  self.red_light = Light(RED_GPIO)

 def response(self,state):
  if state == State.GREEN :
   self.amber_light._off()
   self.red_light._off()
   self.green_light._on()
  elif state == State.RED :
   self.amber_light._off()
   self.green_light._off()
   self.red_light._on() 
  elif state == State.AMBER :
   self.green_light._off()
   self.red_light._off()
   self.amber_light._on()
   
 def clean(self):
  self.green_light._off()
  self.amber_light._off()
  self.red_light._off()


State Machine : class ตัวหลักในการทำงาน

class StateMachine():
 def __init__(self):
  self.state = State.IDLE
  self.devices = Devices()
    
 
 def chage_state(self):
  if self.state == State.IDLE :
   self.state = State.GREEN
  elif self.state == State.GREEN :
   self.state = State.AMBER
  elif self.state == State.AMBER :
   self.state = State.RED
  elif self.state == State.RED :
   self.state = State.GREEN
  self.devices.response(self.state)
  
 def clean(self):
  self.devices.clean()


เราต้องการ Looper หรือ Time interval ผมเลือกใช้ timeloop ซึ่งเป็น library หนึ่งที่ใช้งานง่าย และกำหนดให้ WAIT_TIME  สำหรับการเปลี่ยนสัญญาณไฟมีค่าคงที่ 5 วินาที

WAIT_TIME = 5

state_mach = StateMachine()
looper = Timeloop()

@looper.job(interval=timedelta(seconds=WAIT_TIME))
def do_it():
 global state_mach
 print(time.ctime())
 state_mach.chage_state()

looper.start()


เมื่อได้ตัวละครครบแล้วก็ประกอบเป็นงานขึ้นมา


import time
from datetime import timedelta
from timeloop import Timeloop
from gpiozero import LED

# constants
RED_GPIO = 16 # pin 36
AMBER_GPIO = 20 # pin 38
GREEN_GPIO = 21 # pin 40
WAIT_TIME = 5

class State() :
    # assign values before being used
    IDLE = 0
    GREEN = 1
    AMBER = 2
    RED = 3
 
 
class Light():
    def __init__(self,gpio):
        self.gpio = gpio
        self.led = LED(gpio)
        self.state = 0 # 0 : off, 1 : on
  
    def _on(self):
        if self.state == 0:
            self.led.on()
            self.state = 1
  
    def _off(self):
        if self.state == 1:
            self.led.off()
            self.state = 0

class Devices():
    def __init__(self):
        self.green_light = Light(GREEN_GPIO)
        self.amber_light = Light(AMBER_GPIO)
        self.red_light = Light(RED_GPIO)

    def response(self,state):
        if state == State.GREEN :
            self.amber_light._off()
            self.red_light._off()
            self.green_light._on()
        elif state == State.RED :
            self.amber_light._off()
            self.green_light._off()
            self.red_light._on() 
        elif state == State.AMBER :
            self.green_light._off()
            self.red_light._off()
            self.amber_light._on()
   
    def clean(self):
        self.green_light._off()
        self.amber_light._off()
        self.red_light._off()
  
class StateMachine():
    def __init__(self):
        self.state = State.IDLE
        self.devices = Devices()    
 
    def chage_state(self):
        if self.state == State.IDLE :
            self.state = State.GREEN
        elif self.state == State.GREEN :
            self.state = State.AMBER
        elif self.state == State.AMBER :
            self.state = State.RED
        elif self.state == State.RED :
            self.state = State.GREEN
        self.devices.response(self.state)
  
    def clean(self):
        self.devices.clean()
      
 


state_mach = StateMachine()
looper = Timeloop()

@looper.job(interval=timedelta(seconds=WAIT_TIME))
def do_it():
    global state_mach
    print(time.ctime())
    state_mach.chage_state()

looper.start()
while True :
    try:
        time.sleep(1)
    except KeyboardInterrupt :
        looper.stop()
        state_mach.clean()
        break



ต่อยอดไปอีกหน่อย ถ้าเราต้องการกำหนดให้ Time interval การแสดงสัญญาณไม่เท่ากัน เช่น แสดงไฟเขียว 5 วินาที ไฟเหลือง 2 วินาที และไฟแดง 10 วินาที ก็ย่อมได้ โดยการปรับปรุง code นิดหน่อย

import threading
from gpiozero import LED

# constants
RED_GPIO = 16 # pin 36
AMBER_GPIO = 20 # pin 38
GREEN_GPIO = 21 # pin 40

class State() :
    IDLE = 0
    GREEN = 1
    AMBER = 2
    RED = 3
 
 
class Light():
 def __init__(self,gpio):
    self.gpio = gpio
    self.led = LED(gpio)
    self.state = 0
  
    def _on(self):
         if self.state == 0:
             self.led.on()
             self.state = 1
  
 def _off(self):
        if self.state == 1:
           self.led.off()
           self.state = 0

class Devices():
    def __init__(self):
        self.green_light = Light(GREEN_GPIO)
        self.amber_light = Light(AMBER_GPIO)
        self.red_light = Light(RED_GPIO)

    def response(self,state):
        if state == State.GREEN :
            self.amber_light._off()
            self.red_light._off()
            self.green_light._on()
        elif state == State.RED :
            self.amber_light._off()
            self.green_light._off()
            self.red_light._on() 
        elif state == State.AMBER :
            self.green_light._off()
            self.red_light._off()
            self.amber_light._on()
   
    def clean(self):
       self.green_light._off()
       self.amber_light._off()
       self.red_light._off()
  
class StateMachine():

    def __init__(self):
        self.state = State.IDLE
        self.devices = Devices()
        self.events = {}
        self.events[State.GREEN]=threading.Event()
        self.events[State.RED]=threading.Event()
        self.events[State.AMBER]=threading.Event()
      
        self.waiting_times={}
        self.waiting_times[State.GREEN] = 5
        self.waiting_times[State.RED] = 10
        self.waiting_times[State.AMBER] = 2
    
 
    def change_state(self):
        if self.state == State.IDLE :
            self.state = State.GREEN 
        elif self.state == State.GREEN :
            self.state = State.AMBER
        elif self.state == State.AMBER :
            self.state = State.RED
        elif self.state == State.RED :
            self.state = State.GREEN
   
        wt = self.waiting_times[self.state]
        self.devices.response(self.state)
        while not self.events[self.state].wait(wt):   
            self.change_state()
  
    def clean(self):
        self.devices.clean()
   
   

if __name__ == "__main__":
    state_mach = StateMachine()
    try:
        state_mach.change_state()
    except KeyboardInterrupt :
        state_mach.clean()
 



อีกตัวอย่างหนึ่งที่มี self transition ซึ่งก็คือรูปแบบที่ machine ไม่เปลี่ยนสถานะแต่คงอยู่ในสถานะเดิมต่อไปเมื่อมีเหตุการณ์บางอย่างเกิดขึ้น ผมขอยกตัวอย่างที่เห็นที่บ้านคือระบบเปิดไฟอัตโนมัติ เมื่อมีอะไรที่สะท้อนแสงได้เข้ามาใกล้ระยะ 2 เมตร ไฟจะเปิด (state = ON) เป็นเวลา 15 วินาที (event = TIME_OUT)  แล้วก็จะปิดตัวเอง (state = OFF) แต่ถ้าระหว่างที่ไฟยังไม่ดับลง (ยังไม่ครบ 15 วินาที) หากมีวัตถุเข้ามาอีก ระบบก็จะ reset เวลาใหม่ (event=RESET_TIMER)  แล้วต่อเวลาไปอีก 15 วินาที จะเห็นได้ว่า การมีวัตถุเข้ามาในระยะตรวจสอบในครั้งที่สอง สาม ... จะทำให้ machine  ยังคงรักษาสถานะ "ON" ต่อไป เราสามารถเขียนแผนภาพได้ดังนี้



สถานะมีสองสถานะคือ OFF และ ON เหตุการณ์ที่เปลี่ยนสถานะมีสองเหตุการณ์ คือ PUSH_BUTTON และ TIME_OUT  ระบบงานนี้จะมีการทำงานที่ซับซ้อนกว่าระบบที่ยกตัวอย่างมาก่อนหน้านี้ ดูแผนภาพการทำงาน




อธิบาย
1.  เมื่อผู้ใช้กดปุ่ม จะเกิดการเหตุการณ์ "PUSH_BUTTON" ขึ้นและถูกนำไปวางลงใน Queue

2. ระบบจะคอยตรวจสอบ Queue หากมีข้อมูลก็จะ pop ออกมา
    2.1 หากได้ข้อมูลที่ได้คือ PUSH_BUTTON ระบบจะทำการตรวจสอบว่า Machine อยู่ในสถานะใด   
            2.1.1 หากเป็น ON จะไม่ทำการ Turn Light On แต่จะข้ามไปทำการ reset timer และ start timer
            2.1.2 หากเป็น OFF จะทำการ Turn Light On แล้วไปทำการ start timer และเปลี่ยนสถานะเป็น ON

    2.2 หากได้ข้อมูลที่ได้คือ TIME_OUT ระบบจะทำการตรวจสอบว่า
             2.2.1 หากเป็น ON จะไม่ทำการ Turn Light Off ทำการ clear timer  และ เปลี่ยนสถานะเป็น OFF

ผมจะใช้การแยกการทำงานระหว่างตัว Main Program กับ Event ที่แทนการกดปุ่มของผู้ใช้ออกกัน เพราะมองว่าเหตุการณ์ของการกดปุ่มนั้นจะเกิดแบบสุ่ม (ผู้ใช้กดปุ่มเมื่อไหร่ก็ได้) จึงต้องมีการดูแลแยกออกไปต่างหาก ส่วน Event TIMEOUT ที่จริงแล้ว Python ก็จะทำการแยกเป็น Thread ออกไปอยู่แล้ว ส่วนการสื่อสารระหว่าง Thread ผมเลือกใช้การวาง Message ไว้บน Queue ซึ่งเป็นวิธีการหนึ่งในหลายวิธีที่ใช้กัน  ที่เลือกเพราะเราสามารถจะใส่ข้อมูลเพิ่มเติมบางอย่างที่เราต้องการได้ในอนาคต

มาเขียน code กัน





import threading
import time
import queue
import gpiozero 

# constants
LED_GPIO = 16 # pin 36
BUTTON_GPIO = 2 # pin 3
MAX_TIME = 5.0 # seconds

class State() :
 IDLE = -1
 ON= 1
 OFF = 0   
 
class Event():
 PUSH_BUTTON = 0
 TIME_OUT = 1  

 
class Light():
 def __init__(self,gpio):
  self.gpio = gpio
  self.led = gpiozero.LED(gpio)
  self.state = 0
  
 def _on(self):
  if self.state == 0:
   self.led.on()
   self.state = 1
  
 def _off(self):
  if self.state == 1:
   self.led.off()
   self.state = 0

class Devices():
 # something should be work when state change.
 def __init__(self,q):
  self.light = Light(LED_GPIO)
  self.evt_q = q
  self.timer = None  

 def response(self,state):
  self.cancel_timer()
  if state == State.ON :
   self.light._on() 
   self.start_timer()
   print("Timer started")
  elif state == State.OFF:
   self.light._off()

 def start_timer(self):
  self.timer = threading.Timer(MAX_TIME,self.on_timeout)
  self.timer.start()
  
 def cancel_timer(self):
  if self.timer :
   self.timer.cancel()
    
 def on_timeout(self):
  self.timer.cancel()
  print("Time out")
  self.evt_q.put(Event.TIME_OUT)    
    
 def clean(self):
  self.timer.cancel()
  self.light._off()

  
class StateMachine(threading.Thread):
 def __init__(self,q,stop_flag):
  threading.Thread.__init__(self)
  self.state = State.OFF
  self.devices = Devices(q)
  self.evt_q = q
  self.stop_flag = stop_flag 
      
 def run(self,timeout=1.0):#override method
  print("Running")
  try:
   while not self.stop_flag.isSet():
    if not self.evt_q.empty():
     evt = self.evt_q.get_nowait()
     self.callback(evt)
  finally:   
   threading.Thread.join(self,timeout) 
       
 def callback(self,evt):
  if evt == Event.PUSH_BUTTON:
   if self.state == State.OFF :
    self.state = State.ON
  elif evt == Event.TIME_OUT:
   if self.state == State.ON:
    self.state = State.OFF
  print(self.state)
  self.devices.response(self.state)
    
 
 
class PushButton(threading.Thread):  
 def __init__(self,q,stop_flag):
  threading.Thread.__init__(self)
  self.button = gpiozero.Button(BUTTON_GPIO)
  self.evt_q = q
  self.stop_flag = stop_flag
  
 def run(self,timeout=1.0): # override method
  try:
   while not self.stop_flag.is_set():
    self.button.when_pressed = self.callback
  finally:  
   threading.Thread.join(self,timeout) 
    
 def callback(self):
  self.evt_q.put(Event.PUSH_BUTTON)
    
  
if __name__ == "__main__":  
 stop_flag = threading.Event()
 evt_q = queue.Queue(1)
 state_mach = StateMachine(evt_q,stop_flag)
 push_btn = PushButton(evt_q,stop_flag)  
 try:
  state_mach.start()
  push_btn.start()
 except KeyboardInterrupt :
  stop_flag.set()
  print("Stopping")
  
 


















ไม่มีความคิดเห็น:

แสดงความคิดเห็น