State Machine (ชื่อเต็มคือ Finite - state Machine) เป็นตัวแบบในการคิดว่า machine จะเป็นได้เพียงหนึ่งสถานะเท่านั้น ณ เวลาใด เวลาหนึ่ง ตัวอย่างที่ยกกันมาบ่อย ๆ และเห็นกันได้ในชีวิตประจำวันคือ สัญญาณไฟจราจร (เขียว เหลือง แดง)
สัญญาณไฟจะมีสถานะที่เป็นไปได้สามสถานะคือ ไป (green) , ชลอ (amber) และหยุด (red) เวลาใดเวลาหนึ่งจะเป็นได้เพียงหนึ่งสถานะเท่านั้น สามารถเขียนแผนภาพได้ดังนี้
ในความเป็นจริงสัญญาณไฟจะไม่อยู่ในสถานะใดสถานะหนึ่งตลอดไปจะต้องมีการเปลี่ยนสถานะ สิ่งที่ทำให้เกิดการเปลี่ยนสถานะอาจเป็นเวลา เจ้าหน้าที่กดเปลี่ยนสัญญาณหรืออื่นๆ ในกรณีที่ใช้เวลา ไฟสัญญาณจะเปลี่ยนไปเมื่อถึงเวลาที่กำหนด
สร้างแบบจำลอง
ในความเป็นจริงสัญญาณไฟจะต้องมีจำนวนเท่ากับจำนวนแยก เช่น สามแยกก็ต้องมี 3 ระบบที่ทำงานสอดคล้องกัน แต่ในตอนนี้จะสร้างเพียงด้านเดียวเท่านั้น งานนี้เราต้องการตัวทำงาน 4 ตัวคือ State, Device , StateMachine และ Timer
State : เพื่อใช้เก็บข้อมูลของ State ที่เป็นไปได้ทั้งหมด
class State() :
IDLE = 0
GREEN = 1
AMBER = 2
RED = 3
Devices : เป็นตัวแทนของอุปกรณ์หรือ machine ที่จะต้องทำงานในระบบ
class Light():
def __init__(self,gpio):
self.gpio = gpio
self.led = LED(gpio)
self.state = 0
def _on(self):
if self.state == 0:
self.led.on()
self.state = 1
def _off(self):
if self.state == 1:
self.led.off()
self.state = 0
class Devices():
def __init__(self):
self.green_light = Light(GREEN_GPIO)
self.amber_light = Light(AMBER_GPIO)
self.red_light = Light(RED_GPIO)
def response(self,state):
if state == State.GREEN :
self.amber_light._off()
self.red_light._off()
self.green_light._on()
elif state == State.RED :
self.amber_light._off()
self.green_light._off()
self.red_light._on()
elif state == State.AMBER :
self.green_light._off()
self.red_light._off()
self.amber_light._on()
def clean(self):
self.green_light._off()
self.amber_light._off()
self.red_light._off()
State Machine : class ตัวหลักในการทำงาน
class StateMachine():
def __init__(self):
self.state = State.IDLE
self.devices = Devices()
def chage_state(self):
if self.state == State.IDLE :
self.state = State.GREEN
elif self.state == State.GREEN :
self.state = State.AMBER
elif self.state == State.AMBER :
self.state = State.RED
elif self.state == State.RED :
self.state = State.GREEN
self.devices.response(self.state)
def clean(self):
self.devices.clean()
เราต้องการ Looper หรือ Time interval ผมเลือกใช้ timeloop ซึ่งเป็น library หนึ่งที่ใช้งานง่าย และกำหนดให้ WAIT_TIME สำหรับการเปลี่ยนสัญญาณไฟมีค่าคงที่ 5 วินาที
WAIT_TIME = 5
state_mach = StateMachine()
looper = Timeloop()
@looper.job(interval=timedelta(seconds=WAIT_TIME))
def do_it():
global state_mach
print(time.ctime())
state_mach.chage_state()
looper.start()
เมื่อได้ตัวละครครบแล้วก็ประกอบเป็นงานขึ้นมา
import time
from datetime import timedelta
from timeloop import Timeloop
from gpiozero import LED
# constants
RED_GPIO = 16 # pin 36
AMBER_GPIO = 20 # pin 38
GREEN_GPIO = 21 # pin 40
WAIT_TIME = 5
class State() :
# assign values before being used
IDLE = 0
GREEN = 1
AMBER = 2
RED = 3
class Light():
def __init__(self,gpio):
self.gpio = gpio
self.led = LED(gpio)
self.state = 0 # 0 : off, 1 : on
def _on(self):
if self.state == 0:
self.led.on()
self.state = 1
def _off(self):
if self.state == 1:
self.led.off()
self.state = 0
class Devices():
def __init__(self):
self.green_light = Light(GREEN_GPIO)
self.amber_light = Light(AMBER_GPIO)
self.red_light = Light(RED_GPIO)
def response(self,state):
if state == State.GREEN :
self.amber_light._off()
self.red_light._off()
self.green_light._on()
elif state == State.RED :
self.amber_light._off()
self.green_light._off()
self.red_light._on()
elif state == State.AMBER :
self.green_light._off()
self.red_light._off()
self.amber_light._on()
def clean(self):
self.green_light._off()
self.amber_light._off()
self.red_light._off()
class StateMachine():
def __init__(self):
self.state = State.IDLE
self.devices = Devices()
def chage_state(self):
if self.state == State.IDLE :
self.state = State.GREEN
elif self.state == State.GREEN :
self.state = State.AMBER
elif self.state == State.AMBER :
self.state = State.RED
elif self.state == State.RED :
self.state = State.GREEN
self.devices.response(self.state)
def clean(self):
self.devices.clean()
state_mach = StateMachine()
looper = Timeloop()
@looper.job(interval=timedelta(seconds=WAIT_TIME))
def do_it():
global state_mach
print(time.ctime())
state_mach.chage_state()
looper.start()
while True :
try:
time.sleep(1)
except KeyboardInterrupt :
looper.stop()
state_mach.clean()
break
ต่อยอดไปอีกหน่อย ถ้าเราต้องการกำหนดให้ Time interval การแสดงสัญญาณไม่เท่ากัน เช่น แสดงไฟเขียว 5 วินาที ไฟเหลือง 2 วินาที และไฟแดง 10 วินาที ก็ย่อมได้ โดยการปรับปรุง code นิดหน่อย
import threading
from gpiozero import LED
# constants
RED_GPIO = 16 # pin 36
AMBER_GPIO = 20 # pin 38
GREEN_GPIO = 21 # pin 40
class State() :
IDLE = 0
GREEN = 1
AMBER = 2
RED = 3
class Light():
def __init__(self,gpio):
self.gpio = gpio
self.led = LED(gpio)
self.state = 0
def _on(self):
if self.state == 0:
self.led.on()
self.state = 1
def _off(self):
if self.state == 1:
self.led.off()
self.state = 0
class Devices():
def __init__(self):
self.green_light = Light(GREEN_GPIO)
self.amber_light = Light(AMBER_GPIO)
self.red_light = Light(RED_GPIO)
def response(self,state):
if state == State.GREEN :
self.amber_light._off()
self.red_light._off()
self.green_light._on()
elif state == State.RED :
self.amber_light._off()
self.green_light._off()
self.red_light._on()
elif state == State.AMBER :
self.green_light._off()
self.red_light._off()
self.amber_light._on()
def clean(self):
self.green_light._off()
self.amber_light._off()
self.red_light._off()
class StateMachine():
def __init__(self):
self.state = State.IDLE
self.devices = Devices()
self.events = {}
self.events[State.GREEN]=threading.Event()
self.events[State.RED]=threading.Event()
self.events[State.AMBER]=threading.Event()
self.waiting_times={}
self.waiting_times[State.GREEN] = 5
self.waiting_times[State.RED] = 10
self.waiting_times[State.AMBER] = 2
def change_state(self):
if self.state == State.IDLE :
self.state = State.GREEN
elif self.state == State.GREEN :
self.state = State.AMBER
elif self.state == State.AMBER :
self.state = State.RED
elif self.state == State.RED :
self.state = State.GREEN
wt = self.waiting_times[self.state]
self.devices.response(self.state)
while not self.events[self.state].wait(wt):
self.change_state()
def clean(self):
self.devices.clean()
if __name__ == "__main__":
state_mach = StateMachine()
try:
state_mach.change_state()
except KeyboardInterrupt :
state_mach.clean()
อีกตัวอย่างหนึ่งที่มี self transition ซึ่งก็คือรูปแบบที่ machine ไม่เปลี่ยนสถานะแต่คงอยู่ในสถานะเดิมต่อไปเมื่อมีเหตุการณ์บางอย่างเกิดขึ้น ผมขอยกตัวอย่างที่เห็นที่บ้านคือระบบเปิดไฟอัตโนมัติ เมื่อมีอะไรที่สะท้อนแสงได้เข้ามาใกล้ระยะ 2 เมตร ไฟจะเปิด (state = ON) เป็นเวลา 15 วินาที (event = TIME_OUT) แล้วก็จะปิดตัวเอง (state = OFF) แต่ถ้าระหว่างที่ไฟยังไม่ดับลง (ยังไม่ครบ 15 วินาที) หากมีวัตถุเข้ามาอีก ระบบก็จะ reset เวลาใหม่ (event=RESET_TIMER) แล้วต่อเวลาไปอีก 15 วินาที จะเห็นได้ว่า การมีวัตถุเข้ามาในระยะตรวจสอบในครั้งที่สอง สาม ... จะทำให้ machine ยังคงรักษาสถานะ "ON" ต่อไป เราสามารถเขียนแผนภาพได้ดังนี้
สถานะมีสองสถานะคือ OFF และ ON เหตุการณ์ที่เปลี่ยนสถานะมีสองเหตุการณ์ คือ PUSH_BUTTON และ TIME_OUT ระบบงานนี้จะมีการทำงานที่ซับซ้อนกว่าระบบที่ยกตัวอย่างมาก่อนหน้านี้ ดูแผนภาพการทำงาน
อธิบาย
1. เมื่อผู้ใช้กดปุ่ม จะเกิดการเหตุการณ์ "PUSH_BUTTON" ขึ้นและถูกนำไปวางลงใน Queue
2. ระบบจะคอยตรวจสอบ Queue หากมีข้อมูลก็จะ pop ออกมา
2.1 หากได้ข้อมูลที่ได้คือ PUSH_BUTTON ระบบจะทำการตรวจสอบว่า Machine อยู่ในสถานะใด
2.1.1 หากเป็น ON จะไม่ทำการ Turn Light On แต่จะข้ามไปทำการ reset timer และ start timer
2.1.2 หากเป็น OFF จะทำการ Turn Light On แล้วไปทำการ start timer และเปลี่ยนสถานะเป็น ON
2.2 หากได้ข้อมูลที่ได้คือ TIME_OUT ระบบจะทำการตรวจสอบว่า
2.2.1 หากเป็น ON จะไม่ทำการ Turn Light Off ทำการ clear timer และ เปลี่ยนสถานะเป็น OFF
ผมจะใช้การแยกการทำงานระหว่างตัว Main Program กับ Event ที่แทนการกดปุ่มของผู้ใช้ออกกัน เพราะมองว่าเหตุการณ์ของการกดปุ่มนั้นจะเกิดแบบสุ่ม (ผู้ใช้กดปุ่มเมื่อไหร่ก็ได้) จึงต้องมีการดูแลแยกออกไปต่างหาก ส่วน Event TIMEOUT ที่จริงแล้ว Python ก็จะทำการแยกเป็น Thread ออกไปอยู่แล้ว ส่วนการสื่อสารระหว่าง Thread ผมเลือกใช้การวาง Message ไว้บน Queue ซึ่งเป็นวิธีการหนึ่งในหลายวิธีที่ใช้กัน ที่เลือกเพราะเราสามารถจะใส่ข้อมูลเพิ่มเติมบางอย่างที่เราต้องการได้ในอนาคต
มาเขียน code กัน
import threading
import time
import queue
import gpiozero
# constants
LED_GPIO = 16 # pin 36
BUTTON_GPIO = 2 # pin 3
MAX_TIME = 5.0 # seconds
class State() :
IDLE = -1
ON= 1
OFF = 0
class Event():
PUSH_BUTTON = 0
TIME_OUT = 1
class Light():
def __init__(self,gpio):
self.gpio = gpio
self.led = gpiozero.LED(gpio)
self.state = 0
def _on(self):
if self.state == 0:
self.led.on()
self.state = 1
def _off(self):
if self.state == 1:
self.led.off()
self.state = 0
class Devices():
# something should be work when state change.
def __init__(self,q):
self.light = Light(LED_GPIO)
self.evt_q = q
self.timer = None
def response(self,state):
self.cancel_timer()
if state == State.ON :
self.light._on()
self.start_timer()
print("Timer started")
elif state == State.OFF:
self.light._off()
def start_timer(self):
self.timer = threading.Timer(MAX_TIME,self.on_timeout)
self.timer.start()
def cancel_timer(self):
if self.timer :
self.timer.cancel()
def on_timeout(self):
self.timer.cancel()
print("Time out")
self.evt_q.put(Event.TIME_OUT)
def clean(self):
self.timer.cancel()
self.light._off()
class StateMachine(threading.Thread):
def __init__(self,q,stop_flag):
threading.Thread.__init__(self)
self.state = State.OFF
self.devices = Devices(q)
self.evt_q = q
self.stop_flag = stop_flag
def run(self,timeout=1.0):#override method
print("Running")
try:
while not self.stop_flag.isSet():
if not self.evt_q.empty():
evt = self.evt_q.get_nowait()
self.callback(evt)
finally:
threading.Thread.join(self,timeout)
def callback(self,evt):
if evt == Event.PUSH_BUTTON:
if self.state == State.OFF :
self.state = State.ON
elif evt == Event.TIME_OUT:
if self.state == State.ON:
self.state = State.OFF
print(self.state)
self.devices.response(self.state)
class PushButton(threading.Thread):
def __init__(self,q,stop_flag):
threading.Thread.__init__(self)
self.button = gpiozero.Button(BUTTON_GPIO)
self.evt_q = q
self.stop_flag = stop_flag
def run(self,timeout=1.0): # override method
try:
while not self.stop_flag.is_set():
self.button.when_pressed = self.callback
finally:
threading.Thread.join(self,timeout)
def callback(self):
self.evt_q.put(Event.PUSH_BUTTON)
if __name__ == "__main__":
stop_flag = threading.Event()
evt_q = queue.Queue(1)
state_mach = StateMachine(evt_q,stop_flag)
push_btn = PushButton(evt_q,stop_flag)
try:
state_mach.start()
push_btn.start()
except KeyboardInterrupt :
stop_flag.set()
print("Stopping")
ไม่มีความคิดเห็น:
แสดงความคิดเห็น