ในบทความนี้จะกล่าวถึงการใช้ภาษา Python เป็นเครื่องมือสนับสนุนการทำงานกับ Multiple Sensors ครับ ในที่นี้ก็คือการทำงานแบบ Concurrency หรือการทำงานพร้อมกันหลายงาน
ในภาษา Python มีเครื่องมือสนับสนุนการทำงานแบบ Concurrency หรือ Parallel ได้แก่ Processes, Thread และ Coroutines ในบทความนี้ผมจะยกตัวอย่างการใช้ Coroutines (ออกเสียงว่า โค-รู-ทีน) ครับ เป็นคุณสมบัติที่มากับ Python ตั้งแต่รุ่น 2.5 และเริ่มกำหนดให้ติดตั้งเป็น standard library ในรุ่นที่ 3.4
หลักการของ Coroutines อาศัยหลักการแบ่งงานหลักออกเป็นงานย่อย งานย่อยเหล่านี้จะสลับกันใช้ทรัพยากรจากคอมพิวเตอร์ เนื่องจากคอมพิวเตอร์ทำงานได้เร็วมากเมื่อเทียบกับการรับรู้ของมนุษย์ การสลับงานไปมาเกิดขึ้นในช่วงเวลาที่สั้นมากจนมนุษย์รับรู้ว่าเป็นการทำงานพร้อมกันหลายงาน การที่จะทำแบบนี้ได้นั้นเราจะต้องใช้งาน Event-Loop ซึ่งจะทำหน้าที่คอยส่ง event หรือ message ออกไปสอบถาม Coroutines ต่าง ๆเพืิ่อติดตามสถานะ จะได้สลับงานได้เหมาะสม
Coroutines นี้ทำให้ Python ทำงานในแบบ Asynchronous คือ ระหว่างที่รอข้อมูลจาก I/O ซึ่งในที่นี้คือ GPIO Pin ตัวโปรแกรมสามารถทำงานอื่นไปพลางก่อนได้ ทำให้ CPU ไม่ต้องหยุดการทำงาน โดยหลักการนี้เป็นหลักการเดียวกับ Node.Js ใช้
ตัวอย่างโครงงาน
เพิื่อให้เห็นภาพชัดเจน ผมจะทำโครงงานระบบตรวจจับผู้บุกรุกโดยใช้ Sensors 3 ชนิด คือ Lux Sensor, Sound Sensor และ Hall Effect Sensor พิจารณาดูแล้วจะเห็นว่าในตัวอย่างนี้มีทั้งที่ใช้ I2C Bus (Lux Sensor) การต่อกับ Digital Pin (Sound และ Hall Effect sensors)อุปกรณ์
1. Raspberry Pi 3 Model B ติดตั้ง Raspbian Jessie release November 20162. Hall Effect Sensor (Keyes)
3. Sound Sensor FC-04
4. Lux Sensor TSL2561
5. Jump Wires
Light Sensor |
Sound Sensor |
Hall Effect Sensor |
หน้าที่ของ Sensor
1. Lux Sensor ใช้ในการตรวจสอบความสว่าง เพื่อให้ระบบรับทราบว่าเป็นช่วงกลางวัน (แสงมาก) หรือ กลางคืน (แสงน้อย) ระบบจะเข้าสู่ mode เฝ้าระวังในช่วงเวลาแสงน้อย2. Sound Sensor ใช้ตรวจจับเสียง ในเวลากลางวัน มีเสียงจากสภาพแวดล้อมเยอะ ค่าที่ได้อาจไม่ช่วยอะไรมาก แต่ในเวลากลางคืนควรจะเงียบ การตรวจเจอเสียงดัง จึงเป็นเรื่องที่ต้องสนใจ
3. Hall Effect Sensor ใช้ตรวจจับการเปิด - ปิด ประตู
Wiring
ต่อ Jump Wire ระหว่าง Sensor เข้ากับ Raspberry Pi ตามตารางข้างล่างนี้Sound Sensor | Raspberry Pi |
GND | PIN 6 (GND) |
VCC | PIN 2 (5 V) |
OUT | PIN 38 |
Hall Effect Sensor | Raspberry Pi |
GND | PIN 6 (GND) |
3V | PIN 1 (3.3 V) |
OUT | PIN 40 |
Lux Sensor | Raspberry Pi |
GND | PIN 6 (GND) |
3V | PIN 1 (3.3 V) |
SDA | PIN 3 (SDA) |
SCL | PIN 5 (SCL) |
เขียนโปรแกรม
1. เริ่มต้นด้วยการกำหนดรายละเอียดของงานที่ต้องทำ ในตัวอย่างนี้ผมกำหนดไว้ 3 งานคือ
1.1. รับค่าจาก Lux Sensor
@asyncio.coroutine
def detect_light():
global __EXIT_FLAG
while not __EXIT_FLAG :
lux = tlsr.get_lux()
logging.debug("Lux = {}".format(lux))
yield from asyncio.sleep(5 )
อธิบาย
@asyncio.coroutine เป็นการทำสิ่งที่เรียกว่า decoration เป็นเทคนิคหนึ่งในภาษา Python ที่ใช้เพื่อขยายขอบเขตการทำงานของฟังก์ชั่น เหมือนกับการตกแต่งภายในบ้านเพื่อให้ห้องแต่ละห้องทำหน้าที่ต่างกัน ในที่นี้เราได้ทำการตกแต่ง coroutine ให้ทำหน้าที่ตามที่กำหนดไว้ในฟังก์ชั่น detect_light()
1.2. รับค่าจาก Sound Sensor
@asyncio.coroutine
def detect_sound():
global __EXIT_FLAG,__SOUND_PIN_PIN,__SOUND_DETECTION_INT
last_detect = datetime.datetime.now()
sound_detected = False
while not __EXIT_FLAG :
yield from asyncio.sleep(0.005) # debounce for 5mSec
timestamp = time.time()
stamp = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
if GPIO.input(__SOUND_PIN) == GPIO.LOW :
last_detect = datetime.datetime.now()
if not sound_detected :
logging.debug('Sound is detected at {}'.format(stamp))
sound_detected = True
else :
if dif_millis(last_detect) > __SOUND_DETECTION_INT and sound_detected :
sound_detected = False
1.3. รับค่าจาก Hall Effect Sensor
@asyncio.coroutine
def detect_hall_effect():
global __EXIT_FLAG,__HALL_PIN,__HALL_DETECTION_INT
last_detect = datetime.datetime.now()
hall_detected = False
while not __EXIT_FLAG :
yield from asyncio.sleep(0.005) # debounce for 5mSec
timestamp = time.time()
stamp = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
if GPIO.input(__HALL_PIN) == GPIO.HIGH :
last_detect = datetime.datetime.now()
if not hall_detected :
logging.debug("Door is opened at {}".format(stamp))
hall_detected = True
else :
if dif_millis(last_detect) > __HALL_DETECTION_INT and hall_detected :
hall_detected = False
logging.debug("Door is closed at {}".format(stamp))
การทำงานในส่วนนี้จะเหมือนกับข้อ 1.2
2. สร้าง List ของงานทั้งหมด
หลังจากกำหนดรายละเอียดการทำงานแล้วขั้นตอนต่อไปคือ การสร้างตัวแปรเพื่อใช้อ้างอิงงานทั้งหมดแล้วเก็บไว้ใน List
tasks = [
asyncio.async(detect_light()),
asyncio.async(detect_sound()),
asyncio.async(detect_hall_effect())
]
ในขั้นตอนนี้มีการใช้คำสั่ง asyncio.async(function name) เพื่อนำเอา function ที่เราได้สร้างไว้ในขั้นตอนก่อนหน้านี้ให้เป็น Task Object ซึ่งจำเป็นสำหรับการทำงานกับ Coroutines และ Event-Loop แล้วก็นำมารวมกันไว้ในตัวแปรชื่อ tasks ซึ่งเป็นตัวแปรชนิด array (หรือ List )
3. เริ่มการทำงาน
ในขั้นตอนนี้ เราต้องสร้างตัวแปร Event-Loop ขึ้นมาก่อน
loop = asyncio.get_event_loop()
จากนั้นก็รวบงานทั้งหมดส่งให้ Event-Loop ที่เราสร้างขึ้นดูแลต่อ
loop.run_until_complete(asyncio.gather(*tasks))
ที่นี้มาดู code ฉบับเต็มกัน
from TSL2561 import TSL2561
import RPi.GPIO as GPIO
import time
import datetime
import asyncio
import smbus
import concurrent.futures
import logging
def dif_millis(start_time):
dt = datetime.datetime.now() - start_time
ms = (dt.days * 24 * 60 * 60 +dt.seconds) + 1000 + dt.microseconds / 1000.00
return int(ms)
@asyncio.coroutine
def detect_light():
global __EXIT_FLAG
while not __EXIT_FLAG :
lux = tsl.get_lux()
logging.debug("Lux = {}".format(lux))
yield from asyncio.sleep(5 )
@asyncio.coroutine
def detect_sound():
global __EXIT_FLAG
last_detect = datetime.datetime.now()
sound_detected = False
while not __EXIT_FLAG :
yield from asyncio.sleep(0.005) # debounce for 5mSec
timestamp = time.time()
stamp = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
if GPIO.input(__SOUND_PIN) == GPIO.LOW :
last_detect = datetime.datetime.now()
if not sound_detected :
logging.debug('Sound is detected at {}'.format(stamp))
sound_detected = True
else :
if dif_millis(last_detect) > __SOUND_DETECTION_INT and sound_detected :
sound_detected = False
@asyncio.coroutine
def detect_hall_effect():
global __EXIT_FLAG
last_detect = datetime.datetime.now()
hall_detected = False
while not __EXIT_FLAG :
yield from asyncio.sleep(0.005) # debounce for 5mSec
timestamp = time.time()
stamp = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
if GPIO.input(__HALL_PIN) == GPIO.HIGH :
last_detect = datetime.datetime.now()
if not hall_detected :
logging.debug("Door is opened at {}".format(stamp))
hall_detected = True
else :
if dif_millis(last_detect) > __HALL_DETECTION_INT and hall_detected :
hall_detected = False
logging.debug("Door is closed at {}".format(stamp))
__SOUND_DETECTION_INT = 1000 # milliseconds
__HALL_DETECTION_INT = 1000 # milliseconds
__EXIT_FLAG = False
__HALL_PIN = 21 # GPIO 21 / PIN 40
__SOUND_PIN = 20 # GPIO 20 / PIN 38
GPIO.setmode(GPIO.BCM)
GPIO.setup(__HALL_PIN , GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(__SOUND_PIN , GPIO.IN, pull_up_down=GPIO.PUD_UP)
tsl = TSL2561(addr=0x39,bus=smbus.SMBus(1),chan=1)
tsl._start()
logging.basicConfig(filename="log2.log",level=logging.DEBUG)
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(detect_light()),
asyncio.async(detect_sound()),
asyncio.async(detect_hall_effect())
]
try :
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt :
__EXIT_FLAG = True
finally :
loop.close()
GPIO.remove_event_detect([__SOUND_PIN,__HALL_PIN])
GPIO.cleanup([__SOUND_PIN,__HALL_PIN])
tsl._stop()
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:root:Lux = 17.346478870633813
DEBUG:root:Lux = 5.548747240097244
DEBUG:root:Door is opened at 12:13:07
DEBUG:root:Door is closed at 12:13:10
DEBUG:root:Lux = 8.808611840286067
DEBUG:root:Lux = 27.75873259601299
DEBUG:root:Lux = 27.723287955989885
DEBUG:root:Lux = 27.727077203136425
DEBUG:root:Sound is detected at 12:14:40
DEBUG:root:Lux = 27.017581143326712
DEBUG:root:Sound is detected at 12:14:56
DEBUG:root:Sound is detected at 12:14:56
DEBUG:root:Lux = 26.81253108798512
...
DEBUG:root:Sound is detected at 12:17:10
DEBUG:root:Lux = 26.366921389191248
DEBUG:root:Lux = 26.34711877533246
DEBUG:root:Sound is detected at 12:17:44
DEBUG:root:Lux = 26.29586354677271
DEBUG:root:Sound is detected at 12:17:50
DEBUG:root:Lux = 26.59173049821303
DEBUG:root:Sound is detected at 12:18:34
DEBUG:root:Lux = 26.70998114137388
...
DEBUG:root:Sound is detected at 12:19:47
DEBUG:root:Sound is detected at 12:23:51
DEBUG:root:Lux = 30.968579381646993
DEBUG:root:Lux = 31.20910025120154
DEBUG:root:Sound is detected at 12:24:04
DEBUG:root:Lux = 29.671160009417658
สิ่งที่ควรพัฒนาต่อไปคือการนำเอาข้อมูลจาก Sensors มาตีความให้เป็นระบบมากขึ้น และพัฒนาระบบที่จะมารองรับหลังการตีความข้อมูลเสร็จแล้วต่อไป
เรื่องการใช้ Coroutines ในภาษา Python อาจจะใหม่สักหน่อยสำหรับท่านที่ยังไม่คุ้นเคย แต่ถ้ามองจุดหลักแล้วก็มีจุดที่ต้องทำความเข้าใจอยู่ไม่กี่จุด ได้แก่ การใช้ coroutine decoration เพื่อให้ฟังก์ชั่นที่เขาเขียนขึ้นมาทำงานเป็น Coroutines ได้ ถัดมาคือการนำเอาฟังก์ชั่นมาสร้างเป็น Task object และสร้างตัวแปร Event-loop ขึ้นมาเพื่อดูแลเรื่อง schedule ที่ใช้ในการสลับงานไปมาระหว่างฟังก์ชั่นที่สร้างไว้ ก็น่าจะมีเท่านี้
ปล. การใช้ Coroutine จะต้องนำเข้า library ชื่อ asyncio ก่อน ซึ่ง library นี้จะถูกติดตั้งมาใน python3.4 หรือใหม่กว่าเท่านั้น และใน Pythonรุ่นที่ 3.5 เป็นต้นไป Syntax สำหรับการใช้ Coroutine มีการเปลี่ยนแปลงไป จะใช้ Code ตามตัวอย่างนี้ไม่ได้แล้วนะครับ แต่ที่ยกมาก็เพราะว่าต้องการให้ใช้ได้กับ Raspbian Jessie ซึ่งยังมีแต่ Python3.4 ไม่มี Python3.5
Download code : https://github.com/somchaisomph/blogs/tree/master/multiple-sensors
ไม่มีความคิดเห็น:
แสดงความคิดเห็น